mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-13 13:21:20 +00:00
fixes from Jae's review
1. remove pointer 2. add Quit() method to Service interface
This commit is contained in:
@ -534,10 +534,10 @@ OUTER_LOOP:
|
|||||||
// Send request and wait.
|
// Send request and wait.
|
||||||
bpr.pool.sendRequest(bpr.height, peer.id)
|
bpr.pool.sendRequest(bpr.height, peer.id)
|
||||||
select {
|
select {
|
||||||
case <-bpr.pool.Quit:
|
case <-bpr.pool.Quit():
|
||||||
bpr.Stop()
|
bpr.Stop()
|
||||||
return
|
return
|
||||||
case <-bpr.Quit:
|
case <-bpr.Quit():
|
||||||
return
|
return
|
||||||
case <-bpr.redoCh:
|
case <-bpr.redoCh:
|
||||||
bpr.reset()
|
bpr.reset()
|
||||||
@ -545,10 +545,10 @@ OUTER_LOOP:
|
|||||||
case <-bpr.gotBlockCh:
|
case <-bpr.gotBlockCh:
|
||||||
// We got the block, now see if it's good.
|
// We got the block, now see if it's good.
|
||||||
select {
|
select {
|
||||||
case <-bpr.pool.Quit:
|
case <-bpr.pool.Quit():
|
||||||
bpr.Stop()
|
bpr.Stop()
|
||||||
return
|
return
|
||||||
case <-bpr.Quit:
|
case <-bpr.Quit():
|
||||||
return
|
return
|
||||||
case <-bpr.redoCh:
|
case <-bpr.redoCh:
|
||||||
bpr.reset()
|
bpr.reset()
|
||||||
|
@ -322,7 +322,7 @@ FOR_LOOP:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue FOR_LOOP
|
continue FOR_LOOP
|
||||||
case <-bcR.Quit:
|
case <-bcR.Quit():
|
||||||
break FOR_LOOP
|
break FOR_LOOP
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -157,7 +157,7 @@ func makeBlock(height int64, state sm.State) *types.Block {
|
|||||||
|
|
||||||
// The Test peer
|
// The Test peer
|
||||||
type bcrTestPeer struct {
|
type bcrTestPeer struct {
|
||||||
*cmn.BaseService
|
cmn.BaseService
|
||||||
id p2p.ID
|
id p2p.ID
|
||||||
ch chan interface{}
|
ch chan interface{}
|
||||||
}
|
}
|
||||||
@ -169,7 +169,7 @@ func newbcrTestPeer(id p2p.ID) *bcrTestPeer {
|
|||||||
id: id,
|
id: id,
|
||||||
ch: make(chan interface{}, 2),
|
ch: make(chan interface{}, 2),
|
||||||
}
|
}
|
||||||
bcr.BaseService = cmn.NewBaseService(nil, "bcrTestPeer", bcr)
|
bcr.BaseService = *cmn.NewBaseService(nil, "bcrTestPeer", bcr)
|
||||||
return bcr
|
return bcr
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -196,4 +196,3 @@ func (tp *bcrTestPeer) IsOutbound() bool { return false }
|
|||||||
func (tp *bcrTestPeer) IsPersistent() bool { return true }
|
func (tp *bcrTestPeer) IsPersistent() bool { return true }
|
||||||
func (tp *bcrTestPeer) Get(s string) interface{} { return s }
|
func (tp *bcrTestPeer) Get(s string) interface{} { return s }
|
||||||
func (tp *bcrTestPeer) Set(string, interface{}) {}
|
func (tp *bcrTestPeer) Set(string, interface{}) {}
|
||||||
func (tp *bcrTestPeer) QuitChan() <-chan struct{} { return tp.Quit }
|
|
||||||
|
@ -380,7 +380,7 @@ func (conR *ConsensusReactor) startBroadcastRoutine() error {
|
|||||||
edph := data.(types.TMEventData).Unwrap().(types.EventDataProposalHeartbeat)
|
edph := data.(types.TMEventData).Unwrap().(types.EventDataProposalHeartbeat)
|
||||||
conR.broadcastProposalHeartbeatMessage(edph)
|
conR.broadcastProposalHeartbeatMessage(edph)
|
||||||
}
|
}
|
||||||
case <-conR.Quit:
|
case <-conR.Quit():
|
||||||
conR.eventBus.UnsubscribeAll(ctx, subscriber)
|
conR.eventBus.UnsubscribeAll(ctx, subscriber)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -541,7 +541,7 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) {
|
|||||||
// if the timeout is relevant to the rs
|
// if the timeout is relevant to the rs
|
||||||
// go to the next step
|
// go to the next step
|
||||||
cs.handleTimeout(ti, rs)
|
cs.handleTimeout(ti, rs)
|
||||||
case <-cs.Quit:
|
case <-cs.Quit():
|
||||||
|
|
||||||
// NOTE: the internalMsgQueue may have signed messages from our
|
// NOTE: the internalMsgQueue may have signed messages from our
|
||||||
// priv_val that haven't hit the WAL, but its ok because
|
// priv_val that haven't hit the WAL, but its ok because
|
||||||
|
@ -127,7 +127,7 @@ func (t *timeoutTicker) timeoutRoutine() {
|
|||||||
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
|
// We can eliminate it by merging the timeoutRoutine into receiveRoutine
|
||||||
// and managing the timeouts ourselves with a millisecond ticker
|
// and managing the timeouts ourselves with a millisecond ticker
|
||||||
go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
|
go func(toi timeoutInfo) { t.tockChan <- toi }(ti)
|
||||||
case <-t.Quit:
|
case <-t.Quit():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -126,7 +126,7 @@ func (evR *EvidenceReactor) broadcastRoutine() {
|
|||||||
// broadcast all pending evidence
|
// broadcast all pending evidence
|
||||||
msg := &EvidenceListMessage{evR.evpool.PendingEvidence()}
|
msg := &EvidenceListMessage{evR.evpool.PendingEvidence()}
|
||||||
evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
|
evR.Switch.Broadcast(EvidenceChannel, struct{ EvidenceMessage }{msg})
|
||||||
case <-evR.Quit:
|
case <-evR.Quit():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
8
glide.lock
generated
8
glide.lock
generated
@ -1,5 +1,5 @@
|
|||||||
hash: 41f411204b59e893053e59cda43466b3a6634c5fc88698d1f3131ecd5f239de7
|
hash: 0a994be202cfc9c8a820c5a68321bbbf5592f48790b9bd408b5f95cd344c3be5
|
||||||
updated: 2018-02-09T09:56:16.586709479Z
|
updated: 2018-02-12T08:29:16.126185849Z
|
||||||
imports:
|
imports:
|
||||||
- name: github.com/btcsuite/btcd
|
- name: github.com/btcsuite/btcd
|
||||||
version: 50de9da05b50eb15658bb350f6ea24368a111ab7
|
version: 50de9da05b50eb15658bb350f6ea24368a111ab7
|
||||||
@ -97,7 +97,7 @@ imports:
|
|||||||
- leveldb/table
|
- leveldb/table
|
||||||
- leveldb/util
|
- leveldb/util
|
||||||
- name: github.com/tendermint/abci
|
- name: github.com/tendermint/abci
|
||||||
version: 5a4f56056e23cdfd5f3733db056968e016468508
|
version: 5913ae8960c7ae5d748c37aa060bd35c99ff8a05
|
||||||
subpackages:
|
subpackages:
|
||||||
- client
|
- client
|
||||||
- example/code
|
- example/code
|
||||||
@ -117,7 +117,7 @@ imports:
|
|||||||
subpackages:
|
subpackages:
|
||||||
- data
|
- data
|
||||||
- name: github.com/tendermint/tmlibs
|
- name: github.com/tendermint/tmlibs
|
||||||
version: 52ce4c20f8bc9b6da5fc1274bcce27c0b9dd738a
|
version: a57340ffb53aefb0fca1fc610d18fcbcc61b126f
|
||||||
subpackages:
|
subpackages:
|
||||||
- autofile
|
- autofile
|
||||||
- cli
|
- cli
|
||||||
|
@ -19,7 +19,7 @@ import:
|
|||||||
- package: github.com/spf13/viper
|
- package: github.com/spf13/viper
|
||||||
version: v1.0.0
|
version: v1.0.0
|
||||||
- package: github.com/tendermint/abci
|
- package: github.com/tendermint/abci
|
||||||
version: develop
|
version: 5913ae8960c7ae5d748c37aa060bd35c99ff8a05
|
||||||
subpackages:
|
subpackages:
|
||||||
- client
|
- client
|
||||||
- example/dummy
|
- example/dummy
|
||||||
@ -31,7 +31,7 @@ import:
|
|||||||
subpackages:
|
subpackages:
|
||||||
- data
|
- data
|
||||||
- package: github.com/tendermint/tmlibs
|
- package: github.com/tendermint/tmlibs
|
||||||
version: develop
|
version: a57340ffb53aefb0fca1fc610d18fcbcc61b126f
|
||||||
subpackages:
|
subpackages:
|
||||||
- autofile
|
- autofile
|
||||||
- cli
|
- cli
|
||||||
|
@ -117,9 +117,9 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
|
|||||||
if next = memR.Mempool.TxsFront(); next == nil {
|
if next = memR.Mempool.TxsFront(); next == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
case <-peer.QuitChan():
|
case <-peer.Quit():
|
||||||
return
|
return
|
||||||
case <-memR.Quit:
|
case <-memR.Quit():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -146,9 +146,9 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
|
|||||||
case <-next.NextWaitChan():
|
case <-next.NextWaitChan():
|
||||||
// see the start of the for loop for nil check
|
// see the start of the for loop for nil check
|
||||||
next = next.Next()
|
next = next.Next()
|
||||||
case <-peer.QuitChan():
|
case <-peer.Quit():
|
||||||
return
|
return
|
||||||
case <-memR.Quit:
|
case <-memR.Quit():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,7 +41,7 @@ func TestNodeStartStop(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-n.Quit:
|
case <-n.Quit():
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
t.Fatal("timed out waiting for shutdown")
|
t.Fatal("timed out waiting for shutdown")
|
||||||
}
|
}
|
||||||
|
@ -18,7 +18,6 @@ import (
|
|||||||
// Peer is an interface representing a peer connected on a reactor.
|
// Peer is an interface representing a peer connected on a reactor.
|
||||||
type Peer interface {
|
type Peer interface {
|
||||||
cmn.Service
|
cmn.Service
|
||||||
QuitChan() <-chan struct{}
|
|
||||||
|
|
||||||
ID() ID // peer's cryptographic ID
|
ID() ID // peer's cryptographic ID
|
||||||
IsOutbound() bool // did we dial the peer
|
IsOutbound() bool // did we dial the peer
|
||||||
@ -332,11 +331,6 @@ func (p *peer) String() string {
|
|||||||
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
|
return fmt.Sprintf("Peer{%v %v in}", p.mconn, p.ID())
|
||||||
}
|
}
|
||||||
|
|
||||||
// QuitChan returns a channel, which will be closed once peer is stopped.
|
|
||||||
func (p *peer) QuitChan() <-chan struct{} {
|
|
||||||
return p.Quit
|
|
||||||
}
|
|
||||||
|
|
||||||
//------------------------------------------------------------------
|
//------------------------------------------------------------------
|
||||||
// helper funcs
|
// helper funcs
|
||||||
|
|
||||||
|
@ -332,7 +332,7 @@ out:
|
|||||||
select {
|
select {
|
||||||
case <-saveFileTicker.C:
|
case <-saveFileTicker.C:
|
||||||
a.saveToFile(a.filePath)
|
a.saveToFile(a.filePath)
|
||||||
case <-a.Quit:
|
case <-a.Quit():
|
||||||
break out
|
break out
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -274,7 +274,7 @@ func (r *PEXReactor) ensurePeersRoutine() {
|
|||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
r.ensurePeers()
|
r.ensurePeers()
|
||||||
case <-r.Quit:
|
case <-r.Quit():
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -409,7 +409,7 @@ func (r *PEXReactor) crawlPeersRoutine() {
|
|||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
r.attemptDisconnects()
|
r.attemptDisconnects()
|
||||||
r.crawlPeers()
|
r.crawlPeers()
|
||||||
case <-r.Quit:
|
case <-r.Quit():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -368,4 +368,3 @@ func (mp mockPeer) Send(byte, interface{}) bool { return false }
|
|||||||
func (mp mockPeer) TrySend(byte, interface{}) bool { return false }
|
func (mp mockPeer) TrySend(byte, interface{}) bool { return false }
|
||||||
func (mp mockPeer) Set(string, interface{}) {}
|
func (mp mockPeer) Set(string, interface{}) {}
|
||||||
func (mp mockPeer) Get(string) interface{} { return nil }
|
func (mp mockPeer) Get(string) interface{} { return nil }
|
||||||
func (mp mockPeer) QuitChan() <-chan struct{} { return mp.Quit }
|
|
||||||
|
@ -118,7 +118,7 @@ func (tm *TrustMetric) OnStart() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// OnStop implements Service
|
// OnStop implements Service
|
||||||
// Nothing to do since the goroutine shuts down by itself via BaseService.Quit
|
// Nothing to do since the goroutine shuts down by itself via BaseService.Quit()
|
||||||
func (tm *TrustMetric) OnStop() {}
|
func (tm *TrustMetric) OnStop() {}
|
||||||
|
|
||||||
// Returns a snapshot of the trust metric history data
|
// Returns a snapshot of the trust metric history data
|
||||||
@ -298,7 +298,7 @@ loop:
|
|||||||
select {
|
select {
|
||||||
case <-tick:
|
case <-tick:
|
||||||
tm.NextTimeInterval()
|
tm.NextTimeInterval()
|
||||||
case <-tm.Quit:
|
case <-tm.Quit():
|
||||||
// Stop all further tracking for this metric
|
// Stop all further tracking for this metric
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
|
@ -200,7 +200,7 @@ loop:
|
|||||||
select {
|
select {
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
tms.SaveToDB()
|
tms.SaveToDB()
|
||||||
case <-tms.Quit:
|
case <-tms.Quit():
|
||||||
break loop
|
break loop
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -338,7 +338,7 @@ func (w *WSEvents) eventListener() {
|
|||||||
ch <- result.Data
|
ch <- result.Data
|
||||||
}
|
}
|
||||||
w.mtx.RUnlock()
|
w.mtx.RUnlock()
|
||||||
case <-w.Quit:
|
case <-w.Quit():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -335,7 +335,7 @@ func (c *WSClient) reconnectRoutine() {
|
|||||||
c.startReadWriteRoutines()
|
c.startReadWriteRoutines()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case <-c.Quit:
|
case <-c.Quit():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -394,7 +394,7 @@ func (c *WSClient) writeRoutine() {
|
|||||||
c.Logger.Debug("sent ping")
|
c.Logger.Debug("sent ping")
|
||||||
case <-c.readRoutineQuit:
|
case <-c.readRoutineQuit:
|
||||||
return
|
return
|
||||||
case <-c.Quit:
|
case <-c.Quit():
|
||||||
if err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
|
if err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")); err != nil {
|
||||||
c.Logger.Error("failed to write message", "err", err)
|
c.Logger.Error("failed to write message", "err", err)
|
||||||
}
|
}
|
||||||
@ -455,7 +455,7 @@ func (c *WSClient) readRoutine() {
|
|||||||
// c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
|
// c.wg.Wait() in c.Stop(). Note we rely on Quit being closed so that it sends unlimited Quit signals to stop
|
||||||
// both readRoutine and writeRoutine
|
// both readRoutine and writeRoutine
|
||||||
select {
|
select {
|
||||||
case <-c.Quit:
|
case <-c.Quit():
|
||||||
case c.ResponsesCh <- response:
|
case c.ResponsesCh <- response:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -132,7 +132,7 @@ func TestWSClientReconnectFailure(t *testing.T) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.ResponsesCh:
|
case <-c.ResponsesCh:
|
||||||
case <-c.Quit:
|
case <-c.Quit():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -217,7 +217,7 @@ func callWgDoneOnResult(t *testing.T, c *WSClient, wg *sync.WaitGroup) {
|
|||||||
if resp.Result != nil {
|
if resp.Result != nil {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
}
|
}
|
||||||
case <-c.Quit:
|
case <-c.Quit():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -484,7 +484,7 @@ func (wsc *wsConnection) GetEventSubscriber() types.EventSubscriber {
|
|||||||
// It implements WSRPCConnection. It is Goroutine-safe.
|
// It implements WSRPCConnection. It is Goroutine-safe.
|
||||||
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
|
func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
|
||||||
select {
|
select {
|
||||||
case <-wsc.Quit:
|
case <-wsc.Quit():
|
||||||
return
|
return
|
||||||
case wsc.writeChan <- resp:
|
case wsc.writeChan <- resp:
|
||||||
}
|
}
|
||||||
@ -494,7 +494,7 @@ func (wsc *wsConnection) WriteRPCResponse(resp types.RPCResponse) {
|
|||||||
// It implements WSRPCConnection. It is Goroutine-safe
|
// It implements WSRPCConnection. It is Goroutine-safe
|
||||||
func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
|
func (wsc *wsConnection) TryWriteRPCResponse(resp types.RPCResponse) bool {
|
||||||
select {
|
select {
|
||||||
case <-wsc.Quit:
|
case <-wsc.Quit():
|
||||||
return false
|
return false
|
||||||
case wsc.writeChan <- resp:
|
case wsc.writeChan <- resp:
|
||||||
return true
|
return true
|
||||||
@ -525,7 +525,7 @@ func (wsc *wsConnection) readRoutine() {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-wsc.Quit:
|
case <-wsc.Quit():
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
// reset deadline for every type of message (control or data)
|
// reset deadline for every type of message (control or data)
|
||||||
@ -643,7 +643,7 @@ func (wsc *wsConnection) writeRoutine() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case <-wsc.Quit:
|
case <-wsc.Quit():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user