proxy: wrap NewTMSPClient in ClientCreator

This commit is contained in:
Ethan Buchman
2016-09-10 17:14:55 -04:00
parent 41918d619c
commit caeda30b72
4 changed files with 87 additions and 50 deletions

View File

@ -48,10 +48,10 @@ func NewNodeDefault(config cfg.Config) *Node {
// Get PrivValidator // Get PrivValidator
privValidatorFile := config.GetString("priv_validator_file") privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile) privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
return NewNode(config, privValidator, proxy.NewTMSPClientDefault) return NewNode(config, privValidator, proxy.DefaultClientCreator(config))
} }
func NewNode(config cfg.Config, privValidator *types.PrivValidator, newTMSPClient proxy.NewTMSPClient) *Node { func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreator proxy.ClientCreator) *Node {
EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here
@ -67,7 +67,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, newTMSPClien
// Create the proxyApp, which houses three connections: // Create the proxyApp, which houses three connections:
// query, consensus, and mempool // query, consensus, and mempool
proxyApp := proxy.NewAppConns(config, newTMSPClient, state, blockStore) proxyApp := proxy.NewAppConns(config, clientCreator, state, blockStore)
// add the chainid and number of validators to the global config // add the chainid and number of validators to the global config
config.Set("chain_id", state.ChainID) config.Set("chain_id", state.ChainID)
@ -313,7 +313,7 @@ func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
// * supply an in-proc tmsp app // * supply an in-proc tmsp app
// should fork tendermint/tendermint and implement RunNode to // should fork tendermint/tendermint and implement RunNode to
// call NewNode with their custom priv validator and/or custom // call NewNode with their custom priv validator and/or custom
// proxy.NewTMSPClient function. // proxy.ClientCreator interface
func RunNode(config cfg.Config) { func RunNode(config cfg.Config) {
// Wait until the genesis doc becomes available // Wait until the genesis doc becomes available
genDocFile := config.GetString("genesis_file") genDocFile := config.GetString("genesis_file")
@ -392,7 +392,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
// Create two proxyAppConn connections, // Create two proxyAppConn connections,
// one for the consensus and one for the mempool. // one for the consensus and one for the mempool.
proxyApp := proxy.NewAppConns(config, proxy.NewTMSPClientDefault, state, blockStore) proxyApp := proxy.NewAppConns(config, proxy.DefaultClientCreator(config), state, blockStore)
// add the chainid to the global config // add the chainid to the global config
config.Set("chain_id", state.ChainID) config.Set("chain_id", state.ChainID)

View File

@ -45,6 +45,7 @@ var SOCKET = "socket"
func TestEcho(t *testing.T) { func TestEcho(t *testing.T) {
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true)
// Start server // Start server
s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
@ -53,7 +54,7 @@ func TestEcho(t *testing.T) {
} }
defer s.Stop() defer s.Stop()
// Start client // Start client
cli, err := NewTMSPClientDefault(sockPath, SOCKET) cli, err := clientCreator.NewTMSPClient()
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }
@ -69,6 +70,7 @@ func TestEcho(t *testing.T) {
func BenchmarkEcho(b *testing.B) { func BenchmarkEcho(b *testing.B) {
b.StopTimer() // Initialize b.StopTimer() // Initialize
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true)
// Start server // Start server
s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
if err != nil { if err != nil {
@ -76,7 +78,7 @@ func BenchmarkEcho(b *testing.B) {
} }
defer s.Stop() defer s.Stop()
// Start client // Start client
cli, err := NewTMSPClientDefault(sockPath, SOCKET) cli, err := clientCreator.NewTMSPClient()
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }
@ -97,6 +99,7 @@ func BenchmarkEcho(b *testing.B) {
func TestInfo(t *testing.T) { func TestInfo(t *testing.T) {
sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6)) sockPath := Fmt("unix:///tmp/echo_%v.sock", RandStr(6))
clientCreator := NewRemoteClientCreator(sockPath, SOCKET, true)
// Start server // Start server
s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication()) s, err := server.NewSocketServer(sockPath, dummy.NewDummyApplication())
if err != nil { if err != nil {
@ -104,7 +107,7 @@ func TestInfo(t *testing.T) {
} }
defer s.Stop() defer s.Stop()
// Start client // Start client
cli, err := NewTMSPClientDefault(sockPath, SOCKET) cli, err := clientCreator.NewTMSPClient()
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }

View File

@ -4,40 +4,77 @@ import (
"fmt" "fmt"
"sync" "sync"
cfg "github.com/tendermint/go-config"
tmspcli "github.com/tendermint/tmsp/client" tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/example/dummy" "github.com/tendermint/tmsp/example/dummy"
nilapp "github.com/tendermint/tmsp/example/nil" nilapp "github.com/tendermint/tmsp/example/nil"
"github.com/tendermint/tmsp/types"
) )
// Function type to get a connected tmsp client // NewTMSPClient returns newly connected client
// Allows consumers to provide their own in-proc apps, type ClientCreator interface {
// or to implement alternate address schemes and transports NewTMSPClient() (tmspcli.Client, error)
type NewTMSPClient func(addr, transport string) (tmspcli.Client, error) }
// Get a connected tmsp client. //----------------------------------------------------
// Offers some default in-proc apps, else socket/grpc. // local proxy uses a mutex on an in-proc app
func NewTMSPClientDefault(addr, transport string) (tmspcli.Client, error) {
var client tmspcli.Client type localClientCreator struct {
mtx *sync.Mutex
// use local app (for testing) app types.Application
// TODO: local proxy app conn }
switch addr {
case "nilapp": func NewLocalClientCreator(app types.Application) ClientCreator {
app := nilapp.NewNilApplication() return &localClientCreator{
mtx := new(sync.Mutex) // TODO mtx: new(sync.Mutex),
client = tmspcli.NewLocalClient(mtx, app) app: app,
case "dummy": }
app := dummy.NewDummyApplication() }
mtx := new(sync.Mutex) // TODO
client = tmspcli.NewLocalClient(mtx, app) func (l *localClientCreator) NewTMSPClient() (tmspcli.Client, error) {
default: return tmspcli.NewLocalClient(l.mtx, l.app), nil
// Run forever in a loop }
mustConnect := false
remoteApp, err := tmspcli.NewClient(addr, transport, mustConnect) //---------------------------------------------------------------
if err != nil { // remote proxy opens new connections to an external app process
return nil, fmt.Errorf("Failed to connect to proxy for mempool: %v", err)
} type remoteClientCreator struct {
client = remoteApp addr string
} transport string
return client, nil mustConnect bool
}
func NewRemoteClientCreator(addr, transport string, mustConnect bool) ClientCreator {
return &remoteClientCreator{
addr: addr,
transport: transport,
mustConnect: mustConnect,
}
}
func (r *remoteClientCreator) NewTMSPClient() (tmspcli.Client, error) {
// Run forever in a loop
remoteApp, err := tmspcli.NewClient(r.addr, r.transport, r.mustConnect)
if err != nil {
return nil, fmt.Errorf("Failed to connect to proxy: %v", err)
}
return remoteApp, nil
}
//-----------------------------------------------------------------
// default
func DefaultClientCreator(config cfg.Config) ClientCreator {
addr := config.GetString("proxy_app")
transport := config.GetString("tmsp")
switch addr {
case "dummy":
return NewLocalClientCreator(dummy.NewDummyApplication())
case "nil":
return NewLocalClientCreator(nilapp.NewNilApplication())
default:
mustConnect := true
return NewRemoteClientCreator(addr, transport, mustConnect)
}
} }

View File

@ -12,8 +12,8 @@ type AppConns interface {
Query() AppConnQuery Query() AppConnQuery
} }
func NewAppConns(config cfg.Config, newTMSPClient NewTMSPClient, state State, blockStore BlockStore) AppConns { func NewAppConns(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) AppConns {
return NewMultiAppConn(config, newTMSPClient, state, blockStore) return NewMultiAppConn(config, clientCreator, state, blockStore)
} }
// a multiAppConn is made of a few appConns (mempool, consensus, query) // a multiAppConn is made of a few appConns (mempool, consensus, query)
@ -30,16 +30,16 @@ type multiAppConn struct {
consensusConn *appConnConsensus consensusConn *appConnConsensus
queryConn *appConnQuery queryConn *appConnQuery
newTMSPClient NewTMSPClient clientCreator ClientCreator
} }
// Make all necessary tmsp connections to the application // Make all necessary tmsp connections to the application
func NewMultiAppConn(config cfg.Config, newTMSPClient NewTMSPClient, state State, blockStore BlockStore) *multiAppConn { func NewMultiAppConn(config cfg.Config, clientCreator ClientCreator, state State, blockStore BlockStore) *multiAppConn {
multiAppConn := &multiAppConn{ multiAppConn := &multiAppConn{
config: config, config: config,
state: state, state: state,
blockStore: blockStore, blockStore: blockStore,
newTMSPClient: newTMSPClient, clientCreator: clientCreator,
} }
multiAppConn.QuitService = *NewQuitService(log, "multiAppConn", multiAppConn) multiAppConn.QuitService = *NewQuitService(log, "multiAppConn", multiAppConn)
multiAppConn.Start() multiAppConn.Start()
@ -63,25 +63,22 @@ func (app *multiAppConn) Query() AppConnQuery {
func (app *multiAppConn) OnStart() error { func (app *multiAppConn) OnStart() error {
app.QuitService.OnStart() app.QuitService.OnStart()
addr := app.config.GetString("proxy_app")
transport := app.config.GetString("tmsp")
// query connection // query connection
querycli, err := app.newTMSPClient(addr, transport) querycli, err := app.clientCreator.NewTMSPClient()
if err != nil { if err != nil {
return err return err
} }
app.queryConn = NewAppConnQuery(querycli) app.queryConn = NewAppConnQuery(querycli)
// mempool connection // mempool connection
memcli, err := app.newTMSPClient(addr, transport) memcli, err := app.clientCreator.NewTMSPClient()
if err != nil { if err != nil {
return err return err
} }
app.mempoolConn = NewAppConnMempool(memcli) app.mempoolConn = NewAppConnMempool(memcli)
// consensus connection // consensus connection
concli, err := app.newTMSPClient(addr, transport) concli, err := app.clientCreator.NewTMSPClient()
if err != nil { if err != nil {
return err return err
} }