mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-25 06:42:16 +00:00
node: allow replacing existing p2p.Reactor(s) (#3846)
* node: allow replacing existing p2p.Reactor(s) using [`CustomReactors` option](https://godoc.org/github.com/tendermint/tendermint/node#CustomReactors). Warning: beware of accidental name clashes. Here is the list of existing reactors: MEMPOOL, BLOCKCHAIN, CONSENSUS, EVIDENCE, PEX. * check the absence of "CUSTOM" prefix * merge 2 tests * add doc.go to node package
This commit is contained in:
parent
1e3364a014
commit
88e0973f7d
@ -17,6 +17,10 @@ program](https://hackerone.com/tendermint).
|
|||||||
- [libs] \#3811 Remove `db` from libs in favor of `https://github.com/tendermint/tm-cmn`
|
- [libs] \#3811 Remove `db` from libs in favor of `https://github.com/tendermint/tm-cmn`
|
||||||
|
|
||||||
### FEATURES:
|
### FEATURES:
|
||||||
|
- [node] Allow replacing existing p2p.Reactor(s) using [`CustomReactors`
|
||||||
|
option](https://godoc.org/github.com/tendermint/tendermint/node#CustomReactors).
|
||||||
|
Warning: beware of accidental name clashes. Here is the list of existing
|
||||||
|
reactors: MEMPOOL, BLOCKCHAIN, CONSENSUS, EVIDENCE, PEX.
|
||||||
|
|
||||||
### IMPROVEMENTS:
|
### IMPROVEMENTS:
|
||||||
|
|
||||||
|
40
node/doc.go
Normal file
40
node/doc.go
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
/*
|
||||||
|
Package node is the main entry point, where the Node struct, which
|
||||||
|
represents a full node, is defined.
|
||||||
|
|
||||||
|
Adding new p2p.Reactor(s)
|
||||||
|
|
||||||
|
To add a new p2p.Reactor, use the CustomReactors option:
|
||||||
|
|
||||||
|
node, err := NewNode(
|
||||||
|
config,
|
||||||
|
privVal,
|
||||||
|
nodeKey,
|
||||||
|
clientCreator,
|
||||||
|
genesisDocProvider,
|
||||||
|
dbProvider,
|
||||||
|
metricsProvider,
|
||||||
|
logger,
|
||||||
|
CustomReactors(map[string]p2p.Reactor{"CUSTOM": customReactor}),
|
||||||
|
)
|
||||||
|
|
||||||
|
Replacing existing p2p.Reactor(s)
|
||||||
|
|
||||||
|
To replace the built-in p2p.Reactor, use the CustomReactors option:
|
||||||
|
|
||||||
|
node, err := NewNode(
|
||||||
|
config,
|
||||||
|
privVal,
|
||||||
|
nodeKey,
|
||||||
|
clientCreator,
|
||||||
|
genesisDocProvider,
|
||||||
|
dbProvider,
|
||||||
|
metricsProvider,
|
||||||
|
logger,
|
||||||
|
CustomReactors(map[string]p2p.Reactor{"BLOCKCHAIN": customBlockchainReactor}),
|
||||||
|
)
|
||||||
|
|
||||||
|
The list of existing reactors can be found in CustomReactors documentation.
|
||||||
|
|
||||||
|
*/
|
||||||
|
package node
|
23
node/node.go
23
node/node.go
@ -48,10 +48,6 @@ import (
|
|||||||
dbm "github.com/tendermint/tm-cmn/db"
|
dbm "github.com/tendermint/tm-cmn/db"
|
||||||
)
|
)
|
||||||
|
|
||||||
// CustomReactorNamePrefix is a prefix for all custom reactors to prevent
|
|
||||||
// clashes with built-in reactors.
|
|
||||||
const CustomReactorNamePrefix = "CUSTOM_"
|
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
// DBContext specifies config information for loading a new DB.
|
// DBContext specifies config information for loading a new DB.
|
||||||
@ -144,11 +140,26 @@ func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
|
|||||||
// Option sets a parameter for the node.
|
// Option sets a parameter for the node.
|
||||||
type Option func(*Node)
|
type Option func(*Node)
|
||||||
|
|
||||||
// CustomReactors allows you to add custom reactors to the node's Switch.
|
// CustomReactors allows you to add custom reactors (name -> p2p.Reactor) to
|
||||||
|
// the node's Switch.
|
||||||
|
//
|
||||||
|
// WARNING: using any name from the below list of the existing reactors will
|
||||||
|
// result in replacing it with the custom one.
|
||||||
|
//
|
||||||
|
// - MEMPOOL
|
||||||
|
// - BLOCKCHAIN
|
||||||
|
// - CONSENSUS
|
||||||
|
// - EVIDENCE
|
||||||
|
// - PEX
|
||||||
func CustomReactors(reactors map[string]p2p.Reactor) Option {
|
func CustomReactors(reactors map[string]p2p.Reactor) Option {
|
||||||
return func(n *Node) {
|
return func(n *Node) {
|
||||||
for name, reactor := range reactors {
|
for name, reactor := range reactors {
|
||||||
n.sw.AddReactor(CustomReactorNamePrefix+name, reactor)
|
if existingReactor := n.sw.Reactor(name); existingReactor != nil {
|
||||||
|
n.sw.Logger.Info("Replacing existing reactor with a custom one",
|
||||||
|
"name", name, "existing", existingReactor, "custom", reactor)
|
||||||
|
n.sw.RemoveReactor(name, existingReactor)
|
||||||
|
}
|
||||||
|
n.sw.AddReactor(name, reactor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -288,6 +288,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
|
|||||||
defer os.RemoveAll(config.RootDir)
|
defer os.RemoveAll(config.RootDir)
|
||||||
|
|
||||||
cr := p2pmock.NewReactor()
|
cr := p2pmock.NewReactor()
|
||||||
|
customBlockchainReactor := p2pmock.NewReactor()
|
||||||
|
|
||||||
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
|
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -300,7 +301,7 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
|
|||||||
DefaultDBProvider,
|
DefaultDBProvider,
|
||||||
DefaultMetricsProvider(config.Instrumentation),
|
DefaultMetricsProvider(config.Instrumentation),
|
||||||
log.TestingLogger(),
|
log.TestingLogger(),
|
||||||
CustomReactors(map[string]p2p.Reactor{"FOO": cr}),
|
CustomReactors(map[string]p2p.Reactor{"FOO": cr, "BLOCKCHAIN": customBlockchainReactor}),
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -309,6 +310,10 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
|
|||||||
defer n.Stop()
|
defer n.Stop()
|
||||||
|
|
||||||
assert.True(t, cr.IsRunning())
|
assert.True(t, cr.IsRunning())
|
||||||
|
assert.Equal(t, cr, n.Switch().Reactor("FOO"))
|
||||||
|
|
||||||
|
assert.True(t, customBlockchainReactor.IsRunning())
|
||||||
|
assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN"))
|
||||||
}
|
}
|
||||||
|
|
||||||
func state(nVals int, height int64) (sm.State, dbm.DB) {
|
func state(nVals int, height int64) (sm.State, dbm.DB) {
|
||||||
|
@ -152,11 +152,9 @@ func WithMetrics(metrics *Metrics) SwitchOption {
|
|||||||
// AddReactor adds the given reactor to the switch.
|
// AddReactor adds the given reactor to the switch.
|
||||||
// NOTE: Not goroutine safe.
|
// NOTE: Not goroutine safe.
|
||||||
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
|
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
|
||||||
// Validate the reactor.
|
for _, chDesc := range reactor.GetChannels() {
|
||||||
// No two reactors can share the same channel.
|
|
||||||
reactorChannels := reactor.GetChannels()
|
|
||||||
for _, chDesc := range reactorChannels {
|
|
||||||
chID := chDesc.ID
|
chID := chDesc.ID
|
||||||
|
// No two reactors can share the same channel.
|
||||||
if sw.reactorsByCh[chID] != nil {
|
if sw.reactorsByCh[chID] != nil {
|
||||||
panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
|
panic(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
|
||||||
}
|
}
|
||||||
@ -168,6 +166,23 @@ func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
|
|||||||
return reactor
|
return reactor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RemoveReactor removes the given Reactor from the Switch.
|
||||||
|
// NOTE: Not goroutine safe.
|
||||||
|
func (sw *Switch) RemoveReactor(name string, reactor Reactor) {
|
||||||
|
for _, chDesc := range reactor.GetChannels() {
|
||||||
|
// remove channel description
|
||||||
|
for i := 0; i < len(sw.chDescs); i++ {
|
||||||
|
if chDesc.ID == sw.chDescs[i].ID {
|
||||||
|
sw.chDescs = append(sw.chDescs[:i], sw.chDescs[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
delete(sw.reactorsByCh, chDesc.ID)
|
||||||
|
}
|
||||||
|
delete(sw.reactors, name)
|
||||||
|
reactor.SetSwitch(nil)
|
||||||
|
}
|
||||||
|
|
||||||
// Reactors returns a map of reactors registered on the switch.
|
// Reactors returns a map of reactors registered on the switch.
|
||||||
// NOTE: Not goroutine safe.
|
// NOTE: Not goroutine safe.
|
||||||
func (sw *Switch) Reactors() map[string]Reactor {
|
func (sw *Switch) Reactors() map[string]Reactor {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user