mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-21 08:51:32 +00:00
type safe events
This commit is contained in:
@ -8,7 +8,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
"github.com/tendermint/go-events"
|
|
||||||
"github.com/tendermint/go-p2p"
|
"github.com/tendermint/go-p2p"
|
||||||
"github.com/tendermint/go-wire"
|
"github.com/tendermint/go-wire"
|
||||||
"github.com/tendermint/tendermint/proxy"
|
"github.com/tendermint/tendermint/proxy"
|
||||||
@ -52,7 +51,7 @@ type BlockchainReactor struct {
|
|||||||
timeoutsCh chan string
|
timeoutsCh chan string
|
||||||
lastBlock *types.Block
|
lastBlock *types.Block
|
||||||
|
|
||||||
evsw *events.EventSwitch
|
evsw types.EventSwitch
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
|
func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
|
||||||
@ -268,7 +267,7 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// implements events.Eventable
|
// implements events.Eventable
|
||||||
func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) {
|
func (bcR *BlockchainReactor) SetEventSwitch(evsw types.EventSwitch) {
|
||||||
bcR.evsw = evsw
|
bcR.evsw = evsw
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,14 +1,14 @@
|
|||||||
package consensus
|
package consensus
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/tendermint/go-events"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NOTE: this is blocking
|
// NOTE: this is blocking
|
||||||
func subscribeToEvent(evsw *events.EventSwitch, receiver, eventID string, chanCap int) chan interface{} {
|
func subscribeToEvent(evsw types.EventSwitch, receiver, eventID string, chanCap int) chan interface{} {
|
||||||
// listen for new round
|
// listen for new round
|
||||||
ch := make(chan interface{}, chanCap)
|
ch := make(chan interface{}, chanCap)
|
||||||
evsw.AddListenerForEvent(receiver, eventID, func(data events.EventData) {
|
types.AddListenerForEvent(evsw, receiver, eventID, func(data types.TMEventData) {
|
||||||
ch <- data
|
ch <- data
|
||||||
})
|
})
|
||||||
return ch
|
return ch
|
||||||
|
@ -10,7 +10,6 @@ import (
|
|||||||
|
|
||||||
cfg "github.com/tendermint/go-config"
|
cfg "github.com/tendermint/go-config"
|
||||||
dbm "github.com/tendermint/go-db"
|
dbm "github.com/tendermint/go-db"
|
||||||
"github.com/tendermint/go-events"
|
|
||||||
bc "github.com/tendermint/tendermint/blockchain"
|
bc "github.com/tendermint/tendermint/blockchain"
|
||||||
mempl "github.com/tendermint/tendermint/mempool"
|
mempl "github.com/tendermint/tendermint/mempool"
|
||||||
sm "github.com/tendermint/tendermint/state"
|
sm "github.com/tendermint/tendermint/state"
|
||||||
@ -338,7 +337,7 @@ func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Applic
|
|||||||
cs := NewConsensusState(config, state, proxyAppConnCon, blockStore, mempool)
|
cs := NewConsensusState(config, state, proxyAppConnCon, blockStore, mempool)
|
||||||
cs.SetPrivValidator(pv)
|
cs.SetPrivValidator(pv)
|
||||||
|
|
||||||
evsw := events.NewEventSwitch()
|
evsw := types.NewEventSwitch()
|
||||||
cs.SetEventSwitch(evsw)
|
cs.SetEventSwitch(evsw)
|
||||||
evsw.Start()
|
evsw.Start()
|
||||||
return cs
|
return cs
|
||||||
|
@ -9,7 +9,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
"github.com/tendermint/go-events"
|
|
||||||
"github.com/tendermint/go-p2p"
|
"github.com/tendermint/go-p2p"
|
||||||
"github.com/tendermint/go-wire"
|
"github.com/tendermint/go-wire"
|
||||||
bc "github.com/tendermint/tendermint/blockchain"
|
bc "github.com/tendermint/tendermint/blockchain"
|
||||||
@ -34,7 +33,7 @@ type ConsensusReactor struct {
|
|||||||
blockStore *bc.BlockStore
|
blockStore *bc.BlockStore
|
||||||
conS *ConsensusState
|
conS *ConsensusState
|
||||||
fastSync bool
|
fastSync bool
|
||||||
evsw *events.EventSwitch
|
evsw types.EventSwitch
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
|
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
|
||||||
@ -225,7 +224,7 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// implements events.Eventable
|
// implements events.Eventable
|
||||||
func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
|
func (conR *ConsensusReactor) SetEventSwitch(evsw types.EventSwitch) {
|
||||||
conR.evsw = evsw
|
conR.evsw = evsw
|
||||||
conR.conS.SetEventSwitch(evsw)
|
conR.conS.SetEventSwitch(evsw)
|
||||||
}
|
}
|
||||||
@ -236,12 +235,12 @@ func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
|
|||||||
// broadcasting the result to peers
|
// broadcasting the result to peers
|
||||||
func (conR *ConsensusReactor) registerEventCallbacks() {
|
func (conR *ConsensusReactor) registerEventCallbacks() {
|
||||||
|
|
||||||
conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data events.EventData) {
|
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringNewRoundStep(), func(data types.TMEventData) {
|
||||||
rs := data.(types.EventDataRoundState).RoundState.(*RoundState)
|
rs := data.(types.EventDataRoundState).RoundState.(*RoundState)
|
||||||
conR.broadcastNewRoundStep(rs)
|
conR.broadcastNewRoundStep(rs)
|
||||||
})
|
})
|
||||||
|
|
||||||
conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data events.EventData) {
|
types.AddListenerForEvent(conR.evsw, "conR", types.EventStringVote(), func(data types.TMEventData) {
|
||||||
edv := data.(types.EventDataVote)
|
edv := data.(types.EventDataVote)
|
||||||
conR.broadcastHasVoteMessage(edv.Vote, edv.Index)
|
conR.broadcastHasVoteMessage(edv.Vote, edv.Index)
|
||||||
})
|
})
|
||||||
|
@ -9,7 +9,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
"github.com/tendermint/go-events"
|
|
||||||
"github.com/tendermint/go-wire"
|
"github.com/tendermint/go-wire"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
@ -170,7 +169,7 @@ func TestReplayCrashBeforeWritePropose(t *testing.T) {
|
|||||||
|
|
||||||
func TestReplayCrashBeforeWritePrevote(t *testing.T) {
|
func TestReplayCrashBeforeWritePrevote(t *testing.T) {
|
||||||
cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote
|
cs, newBlockCh, voteMsg, f := setupReplayTest(5, false) // prevote
|
||||||
cs.evsw.AddListenerForEvent("tester", types.EventStringCompleteProposal(), func(data events.EventData) {
|
types.AddListenerForEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), func(data types.TMEventData) {
|
||||||
// Set LastSig
|
// Set LastSig
|
||||||
var err error
|
var err error
|
||||||
var msg ConsensusLogMessage
|
var msg ConsensusLogMessage
|
||||||
@ -187,7 +186,7 @@ func TestReplayCrashBeforeWritePrevote(t *testing.T) {
|
|||||||
|
|
||||||
func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
|
func TestReplayCrashBeforeWritePrecommit(t *testing.T) {
|
||||||
cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit
|
cs, newBlockCh, voteMsg, f := setupReplayTest(7, false) // precommit
|
||||||
cs.evsw.AddListenerForEvent("tester", types.EventStringPolka(), func(data events.EventData) {
|
types.AddListenerForEvent(cs.evsw, "tester", types.EventStringPolka(), func(data types.TMEventData) {
|
||||||
// Set LastSig
|
// Set LastSig
|
||||||
var err error
|
var err error
|
||||||
var msg ConsensusLogMessage
|
var msg ConsensusLogMessage
|
||||||
|
@ -10,7 +10,6 @@ import (
|
|||||||
|
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
cfg "github.com/tendermint/go-config"
|
cfg "github.com/tendermint/go-config"
|
||||||
"github.com/tendermint/go-events"
|
|
||||||
"github.com/tendermint/go-wire"
|
"github.com/tendermint/go-wire"
|
||||||
bc "github.com/tendermint/tendermint/blockchain"
|
bc "github.com/tendermint/tendermint/blockchain"
|
||||||
mempl "github.com/tendermint/tendermint/mempool"
|
mempl "github.com/tendermint/tendermint/mempool"
|
||||||
@ -231,7 +230,7 @@ type ConsensusState struct {
|
|||||||
tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine
|
tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine
|
||||||
timeoutParams *TimeoutParams // parameters and functions for timeout intervals
|
timeoutParams *TimeoutParams // parameters and functions for timeout intervals
|
||||||
|
|
||||||
evsw *events.EventSwitch
|
evsw types.EventSwitch
|
||||||
|
|
||||||
wal *WAL
|
wal *WAL
|
||||||
replayMode bool // so we don't log signing errors during replay
|
replayMode bool // so we don't log signing errors during replay
|
||||||
@ -264,7 +263,7 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap
|
|||||||
// Public interface
|
// Public interface
|
||||||
|
|
||||||
// implements events.Eventable
|
// implements events.Eventable
|
||||||
func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) {
|
func (cs *ConsensusState) SetEventSwitch(evsw types.EventSwitch) {
|
||||||
cs.evsw = evsw
|
cs.evsw = evsw
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -545,7 +544,7 @@ func (cs *ConsensusState) newStep() {
|
|||||||
cs.nSteps += 1
|
cs.nSteps += 1
|
||||||
// newStep is called by updateToStep in NewConsensusState before the evsw is set!
|
// newStep is called by updateToStep in NewConsensusState before the evsw is set!
|
||||||
if cs.evsw != nil {
|
if cs.evsw != nil {
|
||||||
cs.evsw.FireEvent(types.EventStringNewRoundStep(), rs)
|
types.FireEventNewRoundStep(cs.evsw, rs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -719,13 +718,13 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
|
|||||||
// XXX: should we fire timeout here?
|
// XXX: should we fire timeout here?
|
||||||
cs.enterNewRound(ti.Height, 0)
|
cs.enterNewRound(ti.Height, 0)
|
||||||
case RoundStepPropose:
|
case RoundStepPropose:
|
||||||
cs.evsw.FireEvent(types.EventStringTimeoutPropose(), cs.RoundStateEvent())
|
types.FireEventTimeoutPropose(cs.evsw, cs.RoundStateEvent())
|
||||||
cs.enterPrevote(ti.Height, ti.Round)
|
cs.enterPrevote(ti.Height, ti.Round)
|
||||||
case RoundStepPrevoteWait:
|
case RoundStepPrevoteWait:
|
||||||
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
|
types.FireEventTimeoutWait(cs.evsw, cs.RoundStateEvent())
|
||||||
cs.enterPrecommit(ti.Height, ti.Round)
|
cs.enterPrecommit(ti.Height, ti.Round)
|
||||||
case RoundStepPrecommitWait:
|
case RoundStepPrecommitWait:
|
||||||
cs.evsw.FireEvent(types.EventStringTimeoutWait(), cs.RoundStateEvent())
|
types.FireEventTimeoutWait(cs.evsw, cs.RoundStateEvent())
|
||||||
cs.enterNewRound(ti.Height, ti.Round+1)
|
cs.enterNewRound(ti.Height, ti.Round+1)
|
||||||
default:
|
default:
|
||||||
panic(Fmt("Invalid timeout step: %v", ti.Step))
|
panic(Fmt("Invalid timeout step: %v", ti.Step))
|
||||||
@ -777,7 +776,7 @@ func (cs *ConsensusState) enterNewRound(height int, round int) {
|
|||||||
}
|
}
|
||||||
cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping
|
cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping
|
||||||
|
|
||||||
cs.evsw.FireEvent(types.EventStringNewRound(), cs.RoundStateEvent())
|
types.FireEventNewRound(cs.evsw, cs.RoundStateEvent())
|
||||||
|
|
||||||
// Immediately go to enterPropose.
|
// Immediately go to enterPropose.
|
||||||
cs.enterPropose(height, round)
|
cs.enterPropose(height, round)
|
||||||
@ -942,7 +941,7 @@ func (cs *ConsensusState) enterPrevote(height int, round int) {
|
|||||||
|
|
||||||
// fire event for how we got here
|
// fire event for how we got here
|
||||||
if cs.isProposalComplete() {
|
if cs.isProposalComplete() {
|
||||||
cs.evsw.FireEvent(types.EventStringCompleteProposal(), cs.RoundStateEvent())
|
types.FireEventCompleteProposal(cs.evsw, cs.RoundStateEvent())
|
||||||
} else {
|
} else {
|
||||||
// we received +2/3 prevotes for a future round
|
// we received +2/3 prevotes for a future round
|
||||||
// TODO: catchup event?
|
// TODO: catchup event?
|
||||||
@ -1047,7 +1046,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// At this point +2/3 prevoted for a particular block or nil
|
// At this point +2/3 prevoted for a particular block or nil
|
||||||
cs.evsw.FireEvent(types.EventStringPolka(), cs.RoundStateEvent())
|
types.FireEventPolka(cs.evsw, cs.RoundStateEvent())
|
||||||
|
|
||||||
// the latest POLRound should be this round
|
// the latest POLRound should be this round
|
||||||
if cs.Votes.POLRound() < round {
|
if cs.Votes.POLRound() < round {
|
||||||
@ -1063,7 +1062,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
|
|||||||
cs.LockedRound = 0
|
cs.LockedRound = 0
|
||||||
cs.LockedBlock = nil
|
cs.LockedBlock = nil
|
||||||
cs.LockedBlockParts = nil
|
cs.LockedBlockParts = nil
|
||||||
cs.evsw.FireEvent(types.EventStringUnlock(), cs.RoundStateEvent())
|
types.FireEventUnlock(cs.evsw, cs.RoundStateEvent())
|
||||||
}
|
}
|
||||||
cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{})
|
cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{})
|
||||||
return
|
return
|
||||||
@ -1075,7 +1074,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
|
|||||||
if cs.LockedBlock.HashesTo(hash) {
|
if cs.LockedBlock.HashesTo(hash) {
|
||||||
log.Notice("enterPrecommit: +2/3 prevoted locked block. Relocking")
|
log.Notice("enterPrecommit: +2/3 prevoted locked block. Relocking")
|
||||||
cs.LockedRound = round
|
cs.LockedRound = round
|
||||||
cs.evsw.FireEvent(types.EventStringRelock(), cs.RoundStateEvent())
|
types.FireEventRelock(cs.evsw, cs.RoundStateEvent())
|
||||||
cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader)
|
cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -1090,7 +1089,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
|
|||||||
cs.LockedRound = round
|
cs.LockedRound = round
|
||||||
cs.LockedBlock = cs.ProposalBlock
|
cs.LockedBlock = cs.ProposalBlock
|
||||||
cs.LockedBlockParts = cs.ProposalBlockParts
|
cs.LockedBlockParts = cs.ProposalBlockParts
|
||||||
cs.evsw.FireEvent(types.EventStringLock(), cs.RoundStateEvent())
|
types.FireEventLock(cs.evsw, cs.RoundStateEvent())
|
||||||
cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader)
|
cs.signAddVote(types.VoteTypePrecommit, hash, partsHeader)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -1106,7 +1105,7 @@ func (cs *ConsensusState) enterPrecommit(height int, round int) {
|
|||||||
cs.ProposalBlock = nil
|
cs.ProposalBlock = nil
|
||||||
cs.ProposalBlockParts = types.NewPartSetFromHeader(partsHeader)
|
cs.ProposalBlockParts = types.NewPartSetFromHeader(partsHeader)
|
||||||
}
|
}
|
||||||
cs.evsw.FireEvent(types.EventStringUnlock(), cs.RoundStateEvent())
|
types.FireEventUnlock(cs.evsw, cs.RoundStateEvent())
|
||||||
cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{})
|
cs.signAddVote(types.VoteTypePrecommit, nil, types.PartSetHeader{})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -1226,14 +1225,14 @@ func (cs *ConsensusState) finalizeCommit(height int) {
|
|||||||
|
|
||||||
// Fire off event for new block.
|
// Fire off event for new block.
|
||||||
// TODO: Handle app failure. See #177
|
// TODO: Handle app failure. See #177
|
||||||
cs.evsw.FireEvent(types.EventStringNewBlock(), types.EventDataNewBlock{block})
|
types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block})
|
||||||
cs.evsw.FireEvent(types.EventStringNewBlockHeader(), types.EventDataNewBlockHeader{block.Header})
|
types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header})
|
||||||
|
|
||||||
// Create a copy of the state for staging
|
// Create a copy of the state for staging
|
||||||
stateCopy := cs.state.Copy()
|
stateCopy := cs.state.Copy()
|
||||||
|
|
||||||
// event cache for txs
|
// event cache for txs
|
||||||
eventCache := events.NewEventCache(cs.evsw)
|
eventCache := types.NewEventCache(cs.evsw)
|
||||||
|
|
||||||
// Run the block on the State:
|
// Run the block on the State:
|
||||||
// + update validator sets
|
// + update validator sets
|
||||||
@ -1423,7 +1422,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
|
|||||||
added, address, err = cs.LastCommit.AddByIndex(valIndex, vote)
|
added, address, err = cs.LastCommit.AddByIndex(valIndex, vote)
|
||||||
if added {
|
if added {
|
||||||
log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort()))
|
log.Info(Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort()))
|
||||||
cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote})
|
types.FireEventVote(cs.evsw, types.EventDataVote{valIndex, address, vote})
|
||||||
|
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@ -1434,7 +1433,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
|
|||||||
height := cs.Height
|
height := cs.Height
|
||||||
added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey)
|
added, address, err = cs.Votes.AddByIndex(valIndex, vote, peerKey)
|
||||||
if added {
|
if added {
|
||||||
cs.evsw.FireEvent(types.EventStringVote(), types.EventDataVote{valIndex, address, vote})
|
types.FireEventVote(cs.evsw, types.EventDataVote{valIndex, address, vote})
|
||||||
|
|
||||||
switch vote.Type {
|
switch vote.Type {
|
||||||
case types.VoteTypePrevote:
|
case types.VoteTypePrevote:
|
||||||
@ -1452,7 +1451,7 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
|
|||||||
cs.LockedRound = 0
|
cs.LockedRound = 0
|
||||||
cs.LockedBlock = nil
|
cs.LockedBlock = nil
|
||||||
cs.LockedBlockParts = nil
|
cs.LockedBlockParts = nil
|
||||||
cs.evsw.FireEvent(types.EventStringUnlock(), cs.RoundStateEvent())
|
types.FireEventUnlock(cs.evsw, cs.RoundStateEvent())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() {
|
if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() {
|
||||||
|
@ -6,9 +6,8 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tendermint/tendermint/config/tendermint_test"
|
|
||||||
//"github.com/tendermint/go-events"
|
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
|
"github.com/tendermint/tendermint/config/tendermint_test"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -9,7 +9,6 @@ import (
|
|||||||
"github.com/tendermint/go-clist"
|
"github.com/tendermint/go-clist"
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
cfg "github.com/tendermint/go-config"
|
cfg "github.com/tendermint/go-config"
|
||||||
"github.com/tendermint/go-events"
|
|
||||||
"github.com/tendermint/go-p2p"
|
"github.com/tendermint/go-p2p"
|
||||||
"github.com/tendermint/go-wire"
|
"github.com/tendermint/go-wire"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
@ -28,7 +27,7 @@ type MempoolReactor struct {
|
|||||||
p2p.BaseReactor
|
p2p.BaseReactor
|
||||||
config cfg.Config
|
config cfg.Config
|
||||||
Mempool *Mempool
|
Mempool *Mempool
|
||||||
evsw *events.EventSwitch
|
evsw types.EventSwitch
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMempoolReactor(config cfg.Config, mempool *Mempool) *MempoolReactor {
|
func NewMempoolReactor(config cfg.Config, mempool *Mempool) *MempoolReactor {
|
||||||
@ -143,7 +142,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// implements events.Eventable
|
// implements events.Eventable
|
||||||
func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) {
|
func (memR *MempoolReactor) SetEventSwitch(evsw types.EventSwitch) {
|
||||||
memR.evsw = evsw
|
memR.evsw = evsw
|
||||||
}
|
}
|
||||||
|
|
||||||
|
11
node/node.go
11
node/node.go
@ -12,7 +12,6 @@ import (
|
|||||||
cfg "github.com/tendermint/go-config"
|
cfg "github.com/tendermint/go-config"
|
||||||
"github.com/tendermint/go-crypto"
|
"github.com/tendermint/go-crypto"
|
||||||
dbm "github.com/tendermint/go-db"
|
dbm "github.com/tendermint/go-db"
|
||||||
"github.com/tendermint/go-events"
|
|
||||||
"github.com/tendermint/go-p2p"
|
"github.com/tendermint/go-p2p"
|
||||||
"github.com/tendermint/go-rpc"
|
"github.com/tendermint/go-rpc"
|
||||||
"github.com/tendermint/go-rpc/server"
|
"github.com/tendermint/go-rpc/server"
|
||||||
@ -32,7 +31,7 @@ import _ "net/http/pprof"
|
|||||||
type Node struct {
|
type Node struct {
|
||||||
config cfg.Config
|
config cfg.Config
|
||||||
sw *p2p.Switch
|
sw *p2p.Switch
|
||||||
evsw *events.EventSwitch
|
evsw types.EventSwitch
|
||||||
blockStore *bc.BlockStore
|
blockStore *bc.BlockStore
|
||||||
bcReactor *bc.BlockchainReactor
|
bcReactor *bc.BlockchainReactor
|
||||||
mempoolReactor *mempl.MempoolReactor
|
mempoolReactor *mempl.MempoolReactor
|
||||||
@ -80,7 +79,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
|
|||||||
privKey := crypto.GenPrivKeyEd25519()
|
privKey := crypto.GenPrivKeyEd25519()
|
||||||
|
|
||||||
// Make event switch
|
// Make event switch
|
||||||
eventSwitch := events.NewEventSwitch()
|
eventSwitch := types.NewEventSwitch()
|
||||||
_, err := eventSwitch.Start()
|
_, err := eventSwitch.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Exit(Fmt("Failed to start switch: %v", err))
|
Exit(Fmt("Failed to start switch: %v", err))
|
||||||
@ -187,7 +186,7 @@ func (n *Node) Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Add the event switch to reactors, mempool, etc.
|
// Add the event switch to reactors, mempool, etc.
|
||||||
func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
|
func SetEventSwitch(evsw types.EventSwitch, eventables ...types.Eventable) {
|
||||||
for _, e := range eventables {
|
for _, e := range eventables {
|
||||||
e.SetEventSwitch(evsw)
|
e.SetEventSwitch(evsw)
|
||||||
}
|
}
|
||||||
@ -252,7 +251,7 @@ func (n *Node) MempoolReactor() *mempl.MempoolReactor {
|
|||||||
return n.mempoolReactor
|
return n.mempoolReactor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) EventSwitch() *events.EventSwitch {
|
func (n *Node) EventSwitch() types.EventSwitch {
|
||||||
return n.evsw
|
return n.evsw
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -401,7 +400,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
|
|||||||
config.Set("chain_id", state.ChainID)
|
config.Set("chain_id", state.ChainID)
|
||||||
|
|
||||||
// Make event switch
|
// Make event switch
|
||||||
eventSwitch := events.NewEventSwitch()
|
eventSwitch := types.NewEventSwitch()
|
||||||
_, err := eventSwitch.Start()
|
_, err := eventSwitch.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Exit(Fmt("Failed to start event switch: %v", err))
|
Exit(Fmt("Failed to start event switch: %v", err))
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package core
|
package core
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/tendermint/go-events"
|
|
||||||
"github.com/tendermint/go-rpc/types"
|
"github.com/tendermint/go-rpc/types"
|
||||||
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
@ -9,10 +8,10 @@ import (
|
|||||||
|
|
||||||
func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) {
|
func Subscribe(wsCtx rpctypes.WSRPCContext, event string) (*ctypes.ResultSubscribe, error) {
|
||||||
log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event)
|
log.Notice("Subscribe to event", "remote", wsCtx.GetRemoteAddr(), "event", event)
|
||||||
wsCtx.GetEventSwitch().AddListenerForEvent(wsCtx.GetRemoteAddr(), event, func(msg events.EventData) {
|
types.AddListenerForEvent(wsCtx.GetEventSwitch(), wsCtx.GetRemoteAddr(), event, func(msg types.TMEventData) {
|
||||||
// NOTE: EventSwitch callbacks must be nonblocking
|
// NOTE: EventSwitch callbacks must be nonblocking
|
||||||
// NOTE: RPCResponses of subscribed events have id suffix "#event"
|
// NOTE: RPCResponses of subscribed events have id suffix "#event"
|
||||||
tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, types.TMEventData(msg)})
|
tmResult := ctypes.TMResult(&ctypes.ResultEvent{event, msg})
|
||||||
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, ""))
|
wsCtx.TryWriteRPCResponse(rpctypes.NewRPCResponse(wsCtx.Request.ID+"#event", &tmResult, ""))
|
||||||
})
|
})
|
||||||
return &ctypes.ResultSubscribe{}, nil
|
return &ctypes.ResultSubscribe{}, nil
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tendermint/go-events"
|
|
||||||
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
tmsp "github.com/tendermint/tmsp/types"
|
tmsp "github.com/tendermint/tmsp/types"
|
||||||
@ -53,7 +52,7 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
|
|||||||
|
|
||||||
// subscribe to tx being committed in block
|
// subscribe to tx being committed in block
|
||||||
appendTxResCh := make(chan types.EventDataTx, 1)
|
appendTxResCh := make(chan types.EventDataTx, 1)
|
||||||
eventSwitch.AddListenerForEvent("rpc", types.EventStringTx(tx), func(data events.EventData) {
|
types.AddListenerForEvent(eventSwitch, "rpc", types.EventStringTx(tx), func(data types.TMEventData) {
|
||||||
appendTxResCh <- data.(types.EventDataTx)
|
appendTxResCh <- data.(types.EventDataTx)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
cfg "github.com/tendermint/go-config"
|
cfg "github.com/tendermint/go-config"
|
||||||
"github.com/tendermint/go-p2p"
|
"github.com/tendermint/go-p2p"
|
||||||
|
|
||||||
"github.com/tendermint/go-events"
|
|
||||||
bc "github.com/tendermint/tendermint/blockchain"
|
bc "github.com/tendermint/tendermint/blockchain"
|
||||||
"github.com/tendermint/tendermint/consensus"
|
"github.com/tendermint/tendermint/consensus"
|
||||||
mempl "github.com/tendermint/tendermint/mempool"
|
mempl "github.com/tendermint/tendermint/mempool"
|
||||||
@ -12,7 +11,7 @@ import (
|
|||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
var eventSwitch *events.EventSwitch
|
var eventSwitch types.EventSwitch
|
||||||
var blockStore *bc.BlockStore
|
var blockStore *bc.BlockStore
|
||||||
var consensusState *consensus.ConsensusState
|
var consensusState *consensus.ConsensusState
|
||||||
var consensusReactor *consensus.ConsensusReactor
|
var consensusReactor *consensus.ConsensusReactor
|
||||||
@ -28,7 +27,7 @@ func SetConfig(c cfg.Config) {
|
|||||||
config = c
|
config = c
|
||||||
}
|
}
|
||||||
|
|
||||||
func SetEventSwitch(evsw *events.EventSwitch) {
|
func SetEventSwitch(evsw types.EventSwitch) {
|
||||||
eventSwitch = evsw
|
eventSwitch = evsw
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5,7 +5,6 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
"github.com/tendermint/go-events"
|
|
||||||
"github.com/tendermint/tendermint/proxy"
|
"github.com/tendermint/tendermint/proxy"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
tmsp "github.com/tendermint/tmsp/types"
|
tmsp "github.com/tendermint/tmsp/types"
|
||||||
@ -18,7 +17,7 @@ func (s *State) ValidateBlock(block *types.Block) error {
|
|||||||
|
|
||||||
// Execute the block to mutate State.
|
// Execute the block to mutate State.
|
||||||
// Validates block and then executes Data.Txs in the block.
|
// Validates block and then executes Data.Txs in the block.
|
||||||
func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error {
|
func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error {
|
||||||
|
|
||||||
// Validate the block.
|
// Validate the block.
|
||||||
err := s.validateBlock(block)
|
err := s.validateBlock(block)
|
||||||
@ -55,7 +54,7 @@ func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn
|
|||||||
|
|
||||||
// Executes block's transactions on proxyAppConn.
|
// Executes block's transactions on proxyAppConn.
|
||||||
// TODO: Generate a bitmap or otherwise store tx validity in state.
|
// TODO: Generate a bitmap or otherwise store tx validity in state.
|
||||||
func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error {
|
func (s *State) execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error {
|
||||||
|
|
||||||
var validTxs, invalidTxs = 0, 0
|
var validTxs, invalidTxs = 0, 0
|
||||||
|
|
||||||
@ -74,7 +73,7 @@ func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn pro
|
|||||||
} else {
|
} else {
|
||||||
log.Debug("Invalid tx", "code", r.AppendTx.Code, "log", r.AppendTx.Log)
|
log.Debug("Invalid tx", "code", r.AppendTx.Code, "log", r.AppendTx.Log)
|
||||||
invalidTxs += 1
|
invalidTxs += 1
|
||||||
txError = tmsp.CodeType_name[int32(apTx.Code)] // TODO
|
txError = apTx.Code.String()
|
||||||
}
|
}
|
||||||
// NOTE: if we count we can access the tx from the block instead of
|
// NOTE: if we count we can access the tx from the block instead of
|
||||||
// pulling it from the req
|
// pulling it from the req
|
||||||
@ -85,7 +84,7 @@ func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn pro
|
|||||||
Log: apTx.Log,
|
Log: apTx.Log,
|
||||||
Error: txError,
|
Error: txError,
|
||||||
}
|
}
|
||||||
eventCache.FireEvent(types.EventStringTx(req.GetAppendTx().Tx), event)
|
types.FireEventTx(eventCache, event)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
proxyAppConn.SetResponseCallback(proxyCb)
|
proxyAppConn.SetResponseCallback(proxyCb)
|
||||||
|
@ -36,7 +36,7 @@ func EventStringVote() string { return "Vote" }
|
|||||||
// implements events.EventData
|
// implements events.EventData
|
||||||
type TMEventData interface {
|
type TMEventData interface {
|
||||||
events.EventData
|
events.EventData
|
||||||
// AssertIsTMEventData()
|
AssertIsTMEventData()
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -101,3 +101,99 @@ func (_ EventDataNewBlockHeader) AssertIsTMEventData() {}
|
|||||||
func (_ EventDataTx) AssertIsTMEventData() {}
|
func (_ EventDataTx) AssertIsTMEventData() {}
|
||||||
func (_ EventDataRoundState) AssertIsTMEventData() {}
|
func (_ EventDataRoundState) AssertIsTMEventData() {}
|
||||||
func (_ EventDataVote) AssertIsTMEventData() {}
|
func (_ EventDataVote) AssertIsTMEventData() {}
|
||||||
|
|
||||||
|
//----------------------------------------
|
||||||
|
// Wrappers for type safety
|
||||||
|
|
||||||
|
type Fireable interface {
|
||||||
|
events.Fireable
|
||||||
|
}
|
||||||
|
|
||||||
|
type Eventable interface {
|
||||||
|
SetEventSwitch(EventSwitch)
|
||||||
|
}
|
||||||
|
|
||||||
|
type EventSwitch interface {
|
||||||
|
events.EventSwitch
|
||||||
|
}
|
||||||
|
|
||||||
|
type EventCache interface {
|
||||||
|
Fireable
|
||||||
|
Flush()
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEventSwitch() EventSwitch {
|
||||||
|
return events.NewEventSwitch()
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEventCache(evsw EventSwitch) EventCache {
|
||||||
|
return events.NewEventCache(evsw)
|
||||||
|
}
|
||||||
|
|
||||||
|
// All events should be based on this FireEvent to ensure they are TMEventData
|
||||||
|
func fireEvent(fireable events.Fireable, event string, data TMEventData) {
|
||||||
|
fireable.FireEvent(event, data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func AddListenerForEvent(evsw EventSwitch, id, event string, cb func(data TMEventData)) {
|
||||||
|
evsw.AddListenerForEvent(id, event, func(data events.EventData) {
|
||||||
|
cb(data.(TMEventData))
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
//--- block, tx, and vote events
|
||||||
|
|
||||||
|
func FireEventNewBlock(fireable events.Fireable, block EventDataNewBlock) {
|
||||||
|
fireEvent(fireable, EventStringNewBlock(), block)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FireEventNewBlockHeader(fireable events.Fireable, header EventDataNewBlockHeader) {
|
||||||
|
fireEvent(fireable, EventStringNewBlockHeader(), header)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FireEventVote(fireable events.Fireable, vote EventDataVote) {
|
||||||
|
fireEvent(fireable, EventStringVote(), vote)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FireEventTx(fireable events.Fireable, tx EventDataTx) {
|
||||||
|
fireEvent(fireable, EventStringTx(tx.Tx), tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
//--- EventDataRoundState events
|
||||||
|
|
||||||
|
func FireEventNewRoundStep(fireable events.Fireable, rs EventDataRoundState) {
|
||||||
|
fireEvent(fireable, EventStringNewRoundStep(), rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FireEventTimeoutPropose(fireable events.Fireable, rs EventDataRoundState) {
|
||||||
|
fireEvent(fireable, EventStringTimeoutPropose(), rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FireEventTimeoutWait(fireable events.Fireable, rs EventDataRoundState) {
|
||||||
|
fireEvent(fireable, EventStringTimeoutWait(), rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FireEventNewRound(fireable events.Fireable, rs EventDataRoundState) {
|
||||||
|
fireEvent(fireable, EventStringNewRound(), rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FireEventCompleteProposal(fireable events.Fireable, rs EventDataRoundState) {
|
||||||
|
fireEvent(fireable, EventStringCompleteProposal(), rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FireEventPolka(fireable events.Fireable, rs EventDataRoundState) {
|
||||||
|
fireEvent(fireable, EventStringPolka(), rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FireEventUnlock(fireable events.Fireable, rs EventDataRoundState) {
|
||||||
|
fireEvent(fireable, EventStringUnlock(), rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FireEventRelock(fireable events.Fireable, rs EventDataRoundState) {
|
||||||
|
fireEvent(fireable, EventStringRelock(), rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func FireEventLock(fireable events.Fireable, rs EventDataRoundState) {
|
||||||
|
fireEvent(fireable, EventStringLock(), rs)
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user