From 5a6822c8acbd7353729cbb5d8393de2f6da39425 Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 13 Nov 2018 20:32:51 +0400 Subject: [PATCH] abci: localClient improvements & bugfixes & pubsub Unsubscribe issues (#2748) * use READ lock/unlock in ConsensusState#GetLastHeight Refs #2721 * do not use defers when there's no need * fix peer formatting (output its address instead of the pointer) ``` [54310]: E[11-02|11:59:39.851] Connection failed @ sendRoutine module=p2p peer=0xb78f00 conn=MConn{74.207.236.148:26656} err="pong timeout" ``` https://github.com/tendermint/tendermint/issues/2721#issuecomment-435326581 * panic if peer has no state https://github.com/tendermint/tendermint/issues/2721#issuecomment-435347165 It's confusing that sometimes we check if peer has a state, but most of the times we expect it to be there 1. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/mempool/reactor.go#L138 2. https://github.com/tendermint/tendermint/blob/add79700b5fe84417538202b6c927c8cc5383672/rpc/core/consensus.go#L196 (edited) I will change everything to always assume peer has a state and panic otherwise that should help identify issues earlier * abci/localclient: extend lock on app callback App callback should be protected by lock as well (note this was already done for InitChainAsync, why not for others???). Otherwise, when we execute the block, tx might come in and call the callback in the same time we're updating it in execBlockOnProxyApp => DATA RACE Fixes #2721 Consensus state is locked ``` goroutine 113333 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00180009c, 0xc0000c7e00) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*RWMutex).RLock(0xc001800090) /usr/local/go/src/sync/rwmutex.go:50 +0x4e github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).GetRoundState(0xc001800000, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:218 +0x46 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).queryMaj23Routine(0xc0017def80, 0x11104a0, 0xc0072488f0, 0xc007248 9c0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:735 +0x16d created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusReactor).AddPeer /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/reactor.go:172 +0x236 ``` because localClient is locked ``` goroutine 1899 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00003363c, 0xc0000cb500) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*Mutex).Lock(0xc000033638) /usr/local/go/src/sync/mutex.go:134 +0xff github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).SetResponseCallback(0xc0001fb560, 0xc007868540) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:32 +0x33 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnConsensus).SetResponseCallback(0xc00002f750, 0xc007868540) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:57 +0x40 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.execBlockOnProxyApp(0x1104e20, 0xc002ca0ba0, 0x11092a0, 0xc00002f750, 0xc0001fe960, 0xc000bfc660, 0x110cfe0, 0xc000090330, 0xc9d12, 0xc000d9d5a0, ...) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:230 +0x1fd github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state.(*BlockExecutor).ApplyBlock(0xc002c2a230, 0x7, 0x0, 0xc000eae880, 0x6, 0xc002e52c60, 0x16, 0x1f927, 0xc9d12, 0xc000d9d5a0, ...) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/state/execution.go:96 +0x142 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).finalizeCommit(0xc001800000, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1339 +0xa3e github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryFinalizeCommit(0xc001800000, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1270 +0x451 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit.func1(0xc001800000, 0x0, 0x1f928) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1218 +0x90 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).enterCommit(0xc001800000, 0x1f928, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1247 +0x6b8 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).addVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xc003bc7ad0, 0xc003bc7b10) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1659 +0xbad github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).tryAddVote(0xc001800000, 0xc003d8dea0, 0xc000cf4cc0, 0x28, 0xf1, 0xf1, 0xf1) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:1517 +0x59 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).handleMsg(0xc001800000, 0xd98200, 0xc0070dbed0, 0xc000cf4cc0, 0x28) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:660 +0x64b github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).receiveRoutine(0xc001800000, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:617 +0x670 created by github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus.(*ConsensusState).OnStart /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/consensus/state.go:311 +0x132 ``` tx comes in and CheckTx is executed right when we execute the block ``` goroutine 111044 [semacquire, 309 minutes]: sync.runtime_SemacquireMutex(0xc00003363c, 0x0) /usr/local/go/src/runtime/sema.go:71 +0x3d sync.(*Mutex).Lock(0xc000033638) /usr/local/go/src/sync/mutex.go:134 +0xff github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client.(*localClient).CheckTxAsync(0xc0001fb0e0, 0xc002d94500, 0x13f, 0x280, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/abci/client/local_client.go:85 +0x47 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy.(*appConnMempool).CheckTxAsync(0xc00002f720, 0xc002d94500, 0x13f, 0x280, 0x1) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/proxy/app_conn.go:114 +0x51 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool.(*Mempool).CheckTx(0xc002d3a320, 0xc002d94500, 0x13f, 0x280, 0xc0072355f0, 0x0, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/mempool/mempool.go:316 +0x17b github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core.BroadcastTxSync(0xc002d94500, 0x13f, 0x280, 0x0, 0x0, 0x0) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/core/mempool.go:93 +0xb8 reflect.Value.call(0xd85560, 0x10326c0, 0x13, 0xec7b8b, 0x4, 0xc00663f180, 0x1, 0x1, 0xc00663f180, 0xc00663f188, ...) /usr/local/go/src/reflect/value.go:447 +0x449 reflect.Value.Call(0xd85560, 0x10326c0, 0x13, 0xc00663f180, 0x1, 0x1, 0x0, 0x0, 0xc005cc9344) /usr/local/go/src/reflect/value.go:308 +0xa4 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.makeHTTPHandler.func2(0x1102060, 0xc00663f100, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/handlers.go:269 +0x188 net/http.HandlerFunc.ServeHTTP(0xc002c81f20, 0x1102060, 0xc00663f100, 0xc0082d7900) /usr/local/go/src/net/http/server.go:1964 +0x44 net/http.(*ServeMux).ServeHTTP(0xc002c81b60, 0x1102060, 0xc00663f100, 0xc0082d7900) /usr/local/go/src/net/http/server.go:2361 +0x127 github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.maxBytesHandler.ServeHTTP(0x10f8a40, 0xc002c81b60, 0xf4240, 0x1102060, 0xc00663f100, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:219 +0xcf github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server.RecoverAndLogHandler.func1(0x1103220, 0xc00121e620, 0xc0082d7900) /root/go/src/github.com/MinterTeam/minter-go-node/vendor/github.com/tendermint/tendermint/rpc/lib/server/http_server.go:192 +0x394 net/http.HandlerFunc.ServeHTTP(0xc002c06ea0, 0x1103220, 0xc00121e620, 0xc0082d7900) /usr/local/go/src/net/http/server.go:1964 +0x44 net/http.serverHandler.ServeHTTP(0xc001a1aa90, 0x1103220, 0xc00121e620, 0xc0082d7900) /usr/local/go/src/net/http/server.go:2741 +0xab net/http.(*conn).serve(0xc00785a3c0, 0x11041a0, 0xc000f844c0) /usr/local/go/src/net/http/server.go:1847 +0x646 created by net/http.(*Server).Serve /usr/local/go/src/net/http/server.go:2851 +0x2f5 ``` * consensus: use read lock in Receive#VoteMessage * use defer to unlock mutex because application might panic * use defer in every method of the localClient * add a changelog entry * drain channels before Unsubscribe(All) Read https://github.com/tendermint/tendermint/blob/55362ed76630f3e1ebec159a598f6a9fb5892cb1/libs/pubsub/pubsub.go#L13 for the detailed explanation of the issue. We'll need to fix it someday. Make sure to keep an eye on https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-033-pubsub.md * retry instead of panic when peer has no state in reactors other than consensus in /dump_consensus_state RPC endpoint, skip a peer with no state * rpc/core/mempool: simplify error messages * rpc/core/mempool: use time.After instead of timer also, do not log DeliverTx result (to be consistent with other memthods) * unlock before calling the callback in reqRes#SetCallback --- CHANGELOG_PENDING.md | 3 ++ abci/client/client.go | 2 +- abci/client/grpc_client.go | 2 +- abci/client/local_client.go | 67 +++++++++++++++++++++++++----------- abci/client/socket_client.go | 2 +- consensus/reactor.go | 30 ++++++++++++---- consensus/replay_file.go | 26 ++++++++++++-- consensus/state.go | 14 +++----- evidence/reactor.go | 9 +++-- evidence/reactor_test.go | 10 ++++++ libs/pubsub/pubsub.go | 12 +++++-- mempool/mempool.go | 1 + mempool/reactor.go | 23 ++++++++----- mempool/reactor_test.go | 13 +++++++ p2p/pex/addrbook.go | 15 ++++---- rpc/client/helpers.go | 13 ++++++- rpc/core/consensus.go | 5 ++- rpc/core/mempool.go | 66 +++++++++++++++++++---------------- 18 files changed, 217 insertions(+), 96 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index ea3b9759..ea7d97c9 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -26,3 +26,6 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi ### BUG FIXES: +- [abci] unlock mutex in localClient so even when app panics (e.g. during CheckTx), consensus continue working +- [abci] fix DATA RACE in localClient +- [rpc] drain channel before calling Unsubscribe(All) in /broadcast_tx_commit diff --git a/abci/client/client.go b/abci/client/client.go index 55858810..e1eea5d4 100644 --- a/abci/client/client.go +++ b/abci/client/client.go @@ -105,8 +105,8 @@ func (reqRes *ReqRes) SetCallback(cb func(res *types.Response)) { return } - defer reqRes.mtx.Unlock() reqRes.cb = cb + reqRes.mtx.Unlock() } func (reqRes *ReqRes) GetCallback() func(*types.Response) { diff --git a/abci/client/grpc_client.go b/abci/client/grpc_client.go index 4f37b17b..d5cd233a 100644 --- a/abci/client/grpc_client.go +++ b/abci/client/grpc_client.go @@ -111,8 +111,8 @@ func (cli *grpcClient) Error() error { // NOTE: callback may get internally generated flush responses. func (cli *grpcClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() - defer cli.mtx.Unlock() cli.resCb = resCb + cli.mtx.Unlock() } //---------------------------------------- diff --git a/abci/client/local_client.go b/abci/client/local_client.go index 3ac3b6af..d0e50c33 100644 --- a/abci/client/local_client.go +++ b/abci/client/local_client.go @@ -9,8 +9,13 @@ import ( var _ Client = (*localClient)(nil) +// NOTE: use defer to unlock mutex because Application might panic (e.g., in +// case of malicious tx or query). It only makes sense for publicly exposed +// methods like CheckTx (/broadcast_tx_* RPC endpoint) or Query (/abci_query +// RPC endpoint), but defers are used everywhere for the sake of consistency. type localClient struct { cmn.BaseService + mtx *sync.Mutex types.Application Callback @@ -30,8 +35,8 @@ func NewLocalClient(mtx *sync.Mutex, app types.Application) *localClient { func (app *localClient) SetResponseCallback(cb Callback) { app.mtx.Lock() - defer app.mtx.Unlock() app.Callback = cb + app.mtx.Unlock() } // TODO: change types.Application to include Error()? @@ -45,6 +50,9 @@ func (app *localClient) FlushAsync() *ReqRes { } func (app *localClient) EchoAsync(msg string) *ReqRes { + app.mtx.Lock() + defer app.mtx.Unlock() + return app.callback( types.ToRequestEcho(msg), types.ToResponseEcho(msg), @@ -53,8 +61,9 @@ func (app *localClient) EchoAsync(msg string) *ReqRes { func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.Info(req) - app.mtx.Unlock() return app.callback( types.ToRequestInfo(req), types.ToResponseInfo(res), @@ -63,8 +72,9 @@ func (app *localClient) InfoAsync(req types.RequestInfo) *ReqRes { func (app *localClient) SetOptionAsync(req types.RequestSetOption) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.SetOption(req) - app.mtx.Unlock() return app.callback( types.ToRequestSetOption(req), types.ToResponseSetOption(res), @@ -73,8 +83,9 @@ func (app *localClient) SetOptionAsync(req types.RequestSetOption) *ReqRes { func (app *localClient) DeliverTxAsync(tx []byte) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.DeliverTx(tx) - app.mtx.Unlock() return app.callback( types.ToRequestDeliverTx(tx), types.ToResponseDeliverTx(res), @@ -83,8 +94,9 @@ func (app *localClient) DeliverTxAsync(tx []byte) *ReqRes { func (app *localClient) CheckTxAsync(tx []byte) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.CheckTx(tx) - app.mtx.Unlock() return app.callback( types.ToRequestCheckTx(tx), types.ToResponseCheckTx(res), @@ -93,8 +105,9 @@ func (app *localClient) CheckTxAsync(tx []byte) *ReqRes { func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.Query(req) - app.mtx.Unlock() return app.callback( types.ToRequestQuery(req), types.ToResponseQuery(res), @@ -103,8 +116,9 @@ func (app *localClient) QueryAsync(req types.RequestQuery) *ReqRes { func (app *localClient) CommitAsync() *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.Commit() - app.mtx.Unlock() return app.callback( types.ToRequestCommit(), types.ToResponseCommit(res), @@ -113,19 +127,20 @@ func (app *localClient) CommitAsync() *ReqRes { func (app *localClient) InitChainAsync(req types.RequestInitChain) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.InitChain(req) - reqRes := app.callback( + return app.callback( types.ToRequestInitChain(req), types.ToResponseInitChain(res), ) - app.mtx.Unlock() - return reqRes } func (app *localClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.BeginBlock(req) - app.mtx.Unlock() return app.callback( types.ToRequestBeginBlock(req), types.ToResponseBeginBlock(res), @@ -134,8 +149,9 @@ func (app *localClient) BeginBlockAsync(req types.RequestBeginBlock) *ReqRes { func (app *localClient) EndBlockAsync(req types.RequestEndBlock) *ReqRes { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.EndBlock(req) - app.mtx.Unlock() return app.callback( types.ToRequestEndBlock(req), types.ToResponseEndBlock(res), @@ -154,64 +170,73 @@ func (app *localClient) EchoSync(msg string) (*types.ResponseEcho, error) { func (app *localClient) InfoSync(req types.RequestInfo) (*types.ResponseInfo, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.Info(req) - app.mtx.Unlock() return &res, nil } func (app *localClient) SetOptionSync(req types.RequestSetOption) (*types.ResponseSetOption, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.SetOption(req) - app.mtx.Unlock() return &res, nil } func (app *localClient) DeliverTxSync(tx []byte) (*types.ResponseDeliverTx, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.DeliverTx(tx) - app.mtx.Unlock() return &res, nil } func (app *localClient) CheckTxSync(tx []byte) (*types.ResponseCheckTx, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.CheckTx(tx) - app.mtx.Unlock() return &res, nil } func (app *localClient) QuerySync(req types.RequestQuery) (*types.ResponseQuery, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.Query(req) - app.mtx.Unlock() return &res, nil } func (app *localClient) CommitSync() (*types.ResponseCommit, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.Commit() - app.mtx.Unlock() return &res, nil } func (app *localClient) InitChainSync(req types.RequestInitChain) (*types.ResponseInitChain, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.InitChain(req) - app.mtx.Unlock() return &res, nil } func (app *localClient) BeginBlockSync(req types.RequestBeginBlock) (*types.ResponseBeginBlock, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.BeginBlock(req) - app.mtx.Unlock() return &res, nil } func (app *localClient) EndBlockSync(req types.RequestEndBlock) (*types.ResponseEndBlock, error) { app.mtx.Lock() + defer app.mtx.Unlock() + res := app.Application.EndBlock(req) - app.mtx.Unlock() return &res, nil } diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index affea1a9..531d12bc 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -118,8 +118,8 @@ func (cli *socketClient) Error() error { // NOTE: callback may get internally generated flush responses. func (cli *socketClient) SetResponseCallback(resCb Callback) { cli.mtx.Lock() - defer cli.mtx.Unlock() cli.resCb = resCb + cli.mtx.Unlock() } //---------------------------------------- diff --git a/consensus/reactor.go b/consensus/reactor.go index 12e8e0f1..1768a8f0 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -183,7 +183,11 @@ func (conR *ConsensusReactor) RemovePeer(peer p2p.Peer, reason interface{}) { return } // TODO - //peer.Get(PeerStateKey).(*PeerState).Disconnect() + // ps, ok := peer.Get(PeerStateKey).(*PeerState) + // if !ok { + // panic(fmt.Sprintf("Peer %v has no state", peer)) + // } + // ps.Disconnect() } // Receive implements Reactor @@ -214,7 +218,10 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg) // Get peer states - ps := src.Get(types.PeerStateKey).(*PeerState) + ps, ok := src.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", src)) + } switch chID { case StateChannel: @@ -293,9 +300,9 @@ func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) switch msg := msg.(type) { case *VoteMessage: cs := conR.conS - cs.mtx.Lock() + cs.mtx.RLock() height, valSize, lastCommitSize := cs.Height, cs.Validators.Size(), cs.LastCommit.Size() - cs.mtx.Unlock() + cs.mtx.RUnlock() ps.EnsureVoteBitArrays(height, valSize) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) @@ -428,7 +435,10 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) { /* // TODO: Make this broadcast more selective. for _, peer := range conR.Switch.Peers().List() { - ps := peer.Get(PeerStateKey).(*PeerState) + ps, ok := peer.Get(PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", peer)) + } prs := ps.GetRoundState() if prs.Height == vote.Height { // TODO: Also filter on round? @@ -826,7 +836,10 @@ func (conR *ConsensusReactor) peerStatsRoutine() { continue } // Get peer state - ps := peer.Get(types.PeerStateKey).(*PeerState) + ps, ok := peer.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", peer)) + } switch msg.Msg.(type) { case *VoteMessage: if numVotes := ps.RecordVote(); numVotes%votesToContributeToBecomeGoodPeer == 0 { @@ -859,7 +872,10 @@ func (conR *ConsensusReactor) StringIndented(indent string) string { s := "ConsensusReactor{\n" s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n" for _, peer := range conR.Switch.Peers().List() { - ps := peer.Get(types.PeerStateKey).(*PeerState) + ps, ok := peer.Get(types.PeerStateKey).(*PeerState) + if !ok { + panic(fmt.Sprintf("Peer %v has no state", peer)) + } s += indent + " " + ps.StringIndented(indent+" ") + "\n" } s += indent + "}" diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 685eb71f..a326e70e 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -58,7 +58,18 @@ func (cs *ConsensusState) ReplayFile(file string, console bool) error { if err != nil { return errors.Errorf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep) } - defer cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) + defer func() { + // drain newStepCh to make sure we don't block + LOOP: + for { + select { + case <-newStepCh: + default: + break LOOP + } + } + cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) + }() // just open the file for reading, no need to use wal fp, err := os.OpenFile(file, os.O_RDONLY, 0600) @@ -221,7 +232,18 @@ func (pb *playback) replayConsoleLoop() int { if err != nil { cmn.Exit(fmt.Sprintf("failed to subscribe %s to %v", subscriber, types.EventQueryNewRoundStep)) } - defer pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) + defer func() { + // drain newStepCh to make sure we don't block + LOOP: + for { + select { + case <-newStepCh: + default: + break LOOP + } + } + pb.cs.eventBus.Unsubscribe(ctx, subscriber, types.EventQueryNewRoundStep) + }() if len(tokens) == 1 { if err := pb.replayReset(1, newStepCh); err != nil { diff --git a/consensus/state.go b/consensus/state.go index 8c2e292c..e8603011 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -207,18 +207,16 @@ func (cs *ConsensusState) GetState() sm.State { // GetLastHeight returns the last height committed. // If there were no blocks, returns 0. func (cs *ConsensusState) GetLastHeight() int64 { - cs.mtx.Lock() - defer cs.mtx.Unlock() - + cs.mtx.RLock() + defer cs.mtx.RUnlock() return cs.RoundState.Height - 1 } // GetRoundState returns a shallow copy of the internal consensus state. func (cs *ConsensusState) GetRoundState() *cstypes.RoundState { cs.mtx.RLock() - defer cs.mtx.RUnlock() - rs := cs.RoundState // copy + cs.mtx.RUnlock() return &rs } @@ -226,7 +224,6 @@ func (cs *ConsensusState) GetRoundState() *cstypes.RoundState { func (cs *ConsensusState) GetRoundStateJSON() ([]byte, error) { cs.mtx.RLock() defer cs.mtx.RUnlock() - return cdc.MarshalJSON(cs.RoundState) } @@ -234,7 +231,6 @@ func (cs *ConsensusState) GetRoundStateJSON() ([]byte, error) { func (cs *ConsensusState) GetRoundStateSimpleJSON() ([]byte, error) { cs.mtx.RLock() defer cs.mtx.RUnlock() - return cdc.MarshalJSON(cs.RoundState.RoundStateSimple()) } @@ -248,15 +244,15 @@ func (cs *ConsensusState) GetValidators() (int64, []*types.Validator) { // SetPrivValidator sets the private validator account for signing votes. func (cs *ConsensusState) SetPrivValidator(priv types.PrivValidator) { cs.mtx.Lock() - defer cs.mtx.Unlock() cs.privValidator = priv + cs.mtx.Unlock() } // SetTimeoutTicker sets the local timer. It may be useful to overwrite for testing. func (cs *ConsensusState) SetTimeoutTicker(timeoutTicker TimeoutTicker) { cs.mtx.Lock() - defer cs.mtx.Unlock() cs.timeoutTicker = timeoutTicker + cs.mtx.Unlock() } // LoadCommit loads the commit for a given height. diff --git a/evidence/reactor.go b/evidence/reactor.go index 32753b2b..48092fdf 100644 --- a/evidence/reactor.go +++ b/evidence/reactor.go @@ -160,12 +160,15 @@ func (evR *EvidenceReactor) broadcastEvidenceRoutine(peer p2p.Peer) { // Returns the message to send the peer, or nil if the evidence is invalid for the peer. // If message is nil, return true if we should sleep and try again. func (evR EvidenceReactor) checkSendEvidenceMessage(peer p2p.Peer, ev types.Evidence) (msg EvidenceMessage, retry bool) { - // make sure the peer is up to date evHeight := ev.Height() peerState, ok := peer.Get(types.PeerStateKey).(PeerState) - if !ok { - evR.Logger.Info("Found peer without PeerState", "peer", peer) + if !ok { + // Peer does not have a state yet. We set it in the consensus reactor, but + // when we add peer in Switch, the order we call reactors#AddPeer is + // different every time due to us using a map. Sometimes other reactors + // will be initialized before the consensus reactor. We should wait a few + // milliseconds and retry. return nil, true } diff --git a/evidence/reactor_test.go b/evidence/reactor_test.go index 69dcdec5..1c4e731a 100644 --- a/evidence/reactor_test.go +++ b/evidence/reactor_test.go @@ -165,6 +165,16 @@ func TestReactorSelectiveBroadcast(t *testing.T) { // make reactors from statedb reactors := makeAndConnectEvidenceReactors(config, []dbm.DB{stateDB1, stateDB2}) + + // set the peer height on each reactor + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + ps := peerState{height1} + peer.Set(types.PeerStateKey, ps) + } + } + + // update the first reactor peer's height to be very small peer := reactors[0].Switch.Peers().List()[0] ps := peerState{height2} peer.Set(types.PeerStateKey, ps) diff --git a/libs/pubsub/pubsub.go b/libs/pubsub/pubsub.go index 18f098d8..b4e392bb 100644 --- a/libs/pubsub/pubsub.go +++ b/libs/pubsub/pubsub.go @@ -30,9 +30,15 @@ // // s.Subscribe(ctx, sub, qry, out) // defer func() { -// for range out { -// // drain out to make sure we don't block -// } +// // drain out to make sure we don't block +// LOOP: +// for { +// select { +// case <-out: +// default: +// break LOOP +// } +// } // s.UnsubscribeAll(ctx, sub) // }() // for msg := range out { diff --git a/mempool/mempool.go b/mempool/mempool.go index b84eb4a6..0bdb4714 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -300,6 +300,7 @@ func (mem *Mempool) TxsWaitChan() <-chan struct{} { // CONTRACT: Either cb will get called, or err returned. func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { mem.proxyMtx.Lock() + // use defer to unlock mutex because application (*local client*) might panic defer mem.proxyMtx.Unlock() if mem.Size() >= mem.config.Size { diff --git a/mempool/reactor.go b/mempool/reactor.go index 96988be7..072f9667 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -133,16 +133,23 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) { } memTx := next.Value.(*mempoolTx) + // make sure the peer is up to date - height := memTx.Height() - if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil { - peerState := peerState_i.(PeerState) - peerHeight := peerState.GetHeight() - if peerHeight < height-1 { // Allow for a lag of 1 block - time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) - continue - } + peerState, ok := peer.Get(types.PeerStateKey).(PeerState) + if !ok { + // Peer does not have a state yet. We set it in the consensus reactor, but + // when we add peer in Switch, the order we call reactors#AddPeer is + // different every time due to us using a map. Sometimes other reactors + // will be initialized before the consensus reactor. We should wait a few + // milliseconds and retry. + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + continue } + if peerState.GetHeight() < memTx.Height()-1 { // Allow for a lag of 1 block + time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond) + continue + } + // send memTx msg := &TxMessage{Tx: memTx.tx} success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg)) diff --git a/mempool/reactor_test.go b/mempool/reactor_test.go index 8ac400b0..ad9ad8b4 100644 --- a/mempool/reactor_test.go +++ b/mempool/reactor_test.go @@ -21,6 +21,14 @@ import ( "github.com/tendermint/tendermint/types" ) +type peerState struct { + height int64 +} + +func (ps peerState) GetHeight() int64 { + return ps.height +} + // mempoolLogger is a TestingLogger which uses a different // color for each validator ("validator" key must exist). func mempoolLogger() log.Logger { @@ -107,6 +115,11 @@ func TestReactorBroadcastTxMessage(t *testing.T) { r.Stop() } }() + for _, r := range reactors { + for _, peer := range r.Switch.Peers().List() { + peer.Set(types.PeerStateKey, peerState{1}) + } + } // send a bunch of txs to the first reactor's mempool // and wait for them all to be received in the others diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go index 405a4628..e2fcc043 100644 --- a/p2p/pex/addrbook.go +++ b/p2p/pex/addrbook.go @@ -162,10 +162,10 @@ func (a *addrBook) FilePath() string { // AddOurAddress one of our addresses. func (a *addrBook) AddOurAddress(addr *p2p.NetAddress) { - a.mtx.Lock() - defer a.mtx.Unlock() a.Logger.Info("Add our address to book", "addr", addr) + a.mtx.Lock() a.ourAddrs[addr.String()] = struct{}{} + a.mtx.Unlock() } // OurAddress returns true if it is our address. @@ -178,10 +178,10 @@ func (a *addrBook) OurAddress(addr *p2p.NetAddress) bool { func (a *addrBook) AddPrivateIDs(IDs []string) { a.mtx.Lock() - defer a.mtx.Unlock() for _, id := range IDs { a.privateIDs[p2p.ID(id)] = struct{}{} } + a.mtx.Unlock() } // AddAddress implements AddrBook @@ -202,7 +202,7 @@ func (a *addrBook) RemoveAddress(addr *p2p.NetAddress) { if ka == nil { return } - a.Logger.Info("Remove address from book", "addr", ka.Addr, "ID", ka.ID()) + a.Logger.Info("Remove address from book", "addr", addr) a.removeFromAllBuckets(ka) } @@ -217,8 +217,8 @@ func (a *addrBook) IsGood(addr *p2p.NetAddress) bool { // HasAddress returns true if the address is in the book. func (a *addrBook) HasAddress(addr *p2p.NetAddress) bool { a.mtx.Lock() - defer a.mtx.Unlock() ka := a.addrLookup[addr.ID] + a.mtx.Unlock() return ka != nil } @@ -461,13 +461,12 @@ ADDRS_LOOP: // ListOfKnownAddresses returns the new and old addresses. func (a *addrBook) ListOfKnownAddresses() []*knownAddress { - a.mtx.Lock() - defer a.mtx.Unlock() - addrs := []*knownAddress{} + a.mtx.Lock() for _, addr := range a.addrLookup { addrs = append(addrs, addr.copy()) } + a.mtx.Unlock() return addrs } diff --git a/rpc/client/helpers.go b/rpc/client/helpers.go index 7e64d116..2e80a306 100644 --- a/rpc/client/helpers.go +++ b/rpc/client/helpers.go @@ -69,7 +69,18 @@ func WaitForOneEvent(c EventsClient, evtTyp string, timeout time.Duration) (type } // make sure to unregister after the test is over - defer c.UnsubscribeAll(ctx, subscriber) + defer func() { + // drain evts to make sure we don't block + LOOP: + for { + select { + case <-evts: + default: + break LOOP + } + } + c.UnsubscribeAll(ctx, subscriber) + }() select { case evt := <-evts: diff --git a/rpc/core/consensus.go b/rpc/core/consensus.go index 1c2619d5..14662885 100644 --- a/rpc/core/consensus.go +++ b/rpc/core/consensus.go @@ -193,7 +193,10 @@ func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) { peers := p2pPeers.Peers().List() peerStates := make([]ctypes.PeerStateInfo, len(peers)) for i, peer := range peers { - peerState := peer.Get(types.PeerStateKey).(*cm.PeerState) + peerState, ok := peer.Get(types.PeerStateKey).(*cm.PeerState) + if !ok { // peer does not have a state yet + continue + } peerStateJSON, err := peerState.ToJSON() if err != nil { return nil, err diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index c015363a..59877492 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -8,7 +8,6 @@ import ( "github.com/pkg/errors" abci "github.com/tendermint/tendermint/abci/types" - cmn "github.com/tendermint/tendermint/libs/common" ctypes "github.com/tendermint/tendermint/rpc/core/types" "github.com/tendermint/tendermint/types" ) @@ -51,7 +50,7 @@ import ( func BroadcastTxAsync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { err := mempool.CheckTx(tx, nil) if err != nil { - return nil, fmt.Errorf("Error broadcasting transaction: %v", err) + return nil, err } return &ctypes.ResultBroadcastTx{Hash: tx.Hash()}, nil } @@ -94,7 +93,7 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { resCh <- res }) if err != nil { - return nil, fmt.Errorf("Error broadcasting transaction: %v", err) + return nil, err } res := <-resCh r := res.GetCheckTx() @@ -106,8 +105,9 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { }, nil } -// CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app) -// or if we timeout waiting for tx to commit. +// CONTRACT: only returns error if mempool.CheckTx() errs or if we timeout +// waiting for tx to commit. +// // If CheckTx or DeliverTx fail, no error will be returned, but the returned result // will contain a non-OK ABCI code. // @@ -150,20 +150,31 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { // |-----------+------+---------+----------+-----------------| // | tx | Tx | nil | true | The transaction | func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { - // subscribe to tx being committed in block + // Subscribe to tx being committed in block. ctx, cancel := context.WithTimeout(context.Background(), subscribeTimeout) defer cancel() - deliverTxResCh := make(chan interface{}) + deliverTxResCh := make(chan interface{}, 1) q := types.EventQueryTxFor(tx) err := eventBus.Subscribe(ctx, "mempool", q, deliverTxResCh) if err != nil { err = errors.Wrap(err, "failed to subscribe to tx") - logger.Error("Error on broadcastTxCommit", "err", err) - return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err) + logger.Error("Error on broadcast_tx_commit", "err", err) + return nil, err } - defer eventBus.Unsubscribe(context.Background(), "mempool", q) + defer func() { + // drain deliverTxResCh to make sure we don't block + LOOP: + for { + select { + case <-deliverTxResCh: + default: + break LOOP + } + } + eventBus.Unsubscribe(context.Background(), "mempool", q) + }() - // broadcast the tx and register checktx callback + // Broadcast tx and wait for CheckTx result checkTxResCh := make(chan *abci.Response, 1) err = mempool.CheckTx(tx, func(res *abci.Response) { checkTxResCh <- res @@ -172,40 +183,35 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { logger.Error("Error on broadcastTxCommit", "err", err) return nil, fmt.Errorf("Error on broadcastTxCommit: %v", err) } - checkTxRes := <-checkTxResCh - checkTxR := checkTxRes.GetCheckTx() - if checkTxR.Code != abci.CodeTypeOK { - // CheckTx failed! + checkTxResMsg := <-checkTxResCh + checkTxRes := checkTxResMsg.GetCheckTx() + if checkTxRes.Code != abci.CodeTypeOK { return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxR, + CheckTx: *checkTxRes, DeliverTx: abci.ResponseDeliverTx{}, Hash: tx.Hash(), }, nil } - // Wait for the tx to be included in a block, - // timeout after something reasonable. - // TODO: configurable? - timer := time.NewTimer(60 * 2 * time.Second) + // Wait for the tx to be included in a block or timeout. + var deliverTxTimeout = 10 * time.Second // TODO: configurable? select { - case deliverTxResMsg := <-deliverTxResCh: + case deliverTxResMsg := <-deliverTxResCh: // The tx was included in a block. deliverTxRes := deliverTxResMsg.(types.EventDataTx) - // The tx was included in a block. - deliverTxR := deliverTxRes.Result - logger.Info("DeliverTx passed ", "tx", cmn.HexBytes(tx), "response", deliverTxR) return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxR, - DeliverTx: deliverTxR, + CheckTx: *checkTxRes, + DeliverTx: deliverTxRes.Result, Hash: tx.Hash(), Height: deliverTxRes.Height, }, nil - case <-timer.C: - logger.Error("failed to include tx") + case <-time.After(deliverTxTimeout): + err = errors.New("Timed out waiting for tx to be included in a block") + logger.Error("Error on broadcastTxCommit", "err", err) return &ctypes.ResultBroadcastTxCommit{ - CheckTx: *checkTxR, + CheckTx: *checkTxRes, DeliverTx: abci.ResponseDeliverTx{}, Hash: tx.Hash(), - }, fmt.Errorf("Timed out waiting for transaction to be included in a block") + }, err } }