mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-03 16:41:19 +00:00
commit
f41bc5f119
@ -1,8 +1,10 @@
|
|||||||
package tmspcli
|
package tmspcli
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/tendermint/tmsp/types"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/tendermint/tmsp/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Client interface {
|
type Client interface {
|
||||||
@ -39,6 +41,21 @@ type Client interface {
|
|||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
|
||||||
|
func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {
|
||||||
|
switch transport {
|
||||||
|
case "socket":
|
||||||
|
client, err = NewSocketClient(addr, mustConnect)
|
||||||
|
case "grpc":
|
||||||
|
client, err = NewGRPCClient(addr, mustConnect)
|
||||||
|
default:
|
||||||
|
err = fmt.Errorf("Unknown tmsp transport %s", transport)
|
||||||
|
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
//----------------------------------------
|
||||||
|
|
||||||
type Callback func(*types.Request, *types.Response)
|
type Callback func(*types.Request, *types.Response)
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
296
client/grpc_client.go
Normal file
296
client/grpc_client.go
Normal file
@ -0,0 +1,296 @@
|
|||||||
|
package tmspcli
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
context "golang.org/x/net/context"
|
||||||
|
grpc "google.golang.org/grpc"
|
||||||
|
|
||||||
|
. "github.com/tendermint/go-common"
|
||||||
|
"github.com/tendermint/tmsp/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A stripped copy of the remoteClient that makes
|
||||||
|
// synchronous calls using grpc
|
||||||
|
type grpcClient struct {
|
||||||
|
QuitService
|
||||||
|
mustConnect bool
|
||||||
|
|
||||||
|
client types.TMSPApplicationClient
|
||||||
|
|
||||||
|
mtx sync.Mutex
|
||||||
|
addr string
|
||||||
|
err error
|
||||||
|
resCb func(*types.Request, *types.Response) // listens to all callbacks
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGRPCClient(addr string, mustConnect bool) (*grpcClient, error) {
|
||||||
|
cli := &grpcClient{
|
||||||
|
addr: addr,
|
||||||
|
mustConnect: mustConnect,
|
||||||
|
}
|
||||||
|
cli.QuitService = *NewQuitService(nil, "grpcClient", cli)
|
||||||
|
_, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
|
||||||
|
return cli, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
|
||||||
|
return Connect(addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) OnStart() error {
|
||||||
|
cli.QuitService.OnStart()
|
||||||
|
RETRY_LOOP:
|
||||||
|
for {
|
||||||
|
conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
|
||||||
|
if err != nil {
|
||||||
|
if cli.mustConnect {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
fmt.Printf("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr)
|
||||||
|
time.Sleep(time.Second * 3)
|
||||||
|
continue RETRY_LOOP
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cli.client = types.NewTMSPApplicationClient(conn)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) OnStop() {
|
||||||
|
cli.QuitService.OnStop()
|
||||||
|
// TODO: how to close when TMSPApplicationClient interface doesn't expose Close ?
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set listener for all responses
|
||||||
|
// NOTE: callback may get internally generated flush responses.
|
||||||
|
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
|
||||||
|
cli.mtx.Lock()
|
||||||
|
defer cli.mtx.Unlock()
|
||||||
|
cli.resCb = resCb
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) StopForError(err error) {
|
||||||
|
cli.mtx.Lock()
|
||||||
|
fmt.Printf("Stopping tmsp.grpcClient for error: %v\n", err.Error())
|
||||||
|
if cli.err == nil {
|
||||||
|
cli.err = err
|
||||||
|
}
|
||||||
|
cli.mtx.Unlock()
|
||||||
|
cli.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) Error() error {
|
||||||
|
cli.mtx.Lock()
|
||||||
|
defer cli.mtx.Unlock()
|
||||||
|
return cli.err
|
||||||
|
}
|
||||||
|
|
||||||
|
//----------------------------------------
|
||||||
|
// async calls are really sync.
|
||||||
|
// maybe one day, if people really want it, we use grpc streams,
|
||||||
|
// but hopefully not :D
|
||||||
|
|
||||||
|
func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
|
||||||
|
req := types.ToRequestEcho(msg)
|
||||||
|
res, err := cli.client.Echo(context.Background(), req.GetEcho())
|
||||||
|
if err != nil {
|
||||||
|
cli.err = err
|
||||||
|
}
|
||||||
|
return cli.finishAsyncCall(req, &types.Response{&types.Response_Echo{res}})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) FlushAsync() *ReqRes {
|
||||||
|
req := types.ToRequestFlush()
|
||||||
|
res, err := cli.client.Flush(context.Background(), req.GetFlush())
|
||||||
|
if err != nil {
|
||||||
|
cli.err = err
|
||||||
|
}
|
||||||
|
return cli.finishAsyncCall(req, &types.Response{&types.Response_Flush{res}})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) InfoAsync() *ReqRes {
|
||||||
|
req := types.ToRequestInfo()
|
||||||
|
res, err := cli.client.Info(context.Background(), req.GetInfo())
|
||||||
|
if err != nil {
|
||||||
|
cli.err = err
|
||||||
|
}
|
||||||
|
return cli.finishAsyncCall(req, &types.Response{&types.Response_Info{res}})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes {
|
||||||
|
req := types.ToRequestSetOption(key, value)
|
||||||
|
res, err := cli.client.SetOption(context.Background(), req.GetSetOption())
|
||||||
|
if err != nil {
|
||||||
|
cli.err = err
|
||||||
|
}
|
||||||
|
return cli.finishAsyncCall(req, &types.Response{&types.Response_SetOption{res}})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes {
|
||||||
|
req := types.ToRequestAppendTx(tx)
|
||||||
|
res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx())
|
||||||
|
if err != nil {
|
||||||
|
cli.err = err
|
||||||
|
}
|
||||||
|
return cli.finishAsyncCall(req, &types.Response{&types.Response_AppendTx{res}})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
|
||||||
|
req := types.ToRequestCheckTx(tx)
|
||||||
|
res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx())
|
||||||
|
if err != nil {
|
||||||
|
cli.err = err
|
||||||
|
}
|
||||||
|
return cli.finishAsyncCall(req, &types.Response{&types.Response_CheckTx{res}})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) QueryAsync(query []byte) *ReqRes {
|
||||||
|
req := types.ToRequestQuery(query)
|
||||||
|
res, err := cli.client.Query(context.Background(), req.GetQuery())
|
||||||
|
if err != nil {
|
||||||
|
cli.err = err
|
||||||
|
}
|
||||||
|
return cli.finishAsyncCall(req, &types.Response{&types.Response_Query{res}})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) CommitAsync() *ReqRes {
|
||||||
|
req := types.ToRequestCommit()
|
||||||
|
res, err := cli.client.Commit(context.Background(), req.GetCommit())
|
||||||
|
if err != nil {
|
||||||
|
cli.err = err
|
||||||
|
}
|
||||||
|
return cli.finishAsyncCall(req, &types.Response{&types.Response_Commit{res}})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes {
|
||||||
|
req := types.ToRequestInitChain(validators)
|
||||||
|
res, err := cli.client.InitChain(context.Background(), req.GetInitChain())
|
||||||
|
if err != nil {
|
||||||
|
cli.err = err
|
||||||
|
}
|
||||||
|
return cli.finishAsyncCall(req, &types.Response{&types.Response_InitChain{res}})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes {
|
||||||
|
req := types.ToRequestBeginBlock(height)
|
||||||
|
res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock())
|
||||||
|
if err != nil {
|
||||||
|
cli.err = err
|
||||||
|
}
|
||||||
|
return cli.finishAsyncCall(req, &types.Response{&types.Response_BeginBlock{res}})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes {
|
||||||
|
req := types.ToRequestEndBlock(height)
|
||||||
|
res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock())
|
||||||
|
if err != nil {
|
||||||
|
cli.err = err
|
||||||
|
}
|
||||||
|
return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
|
||||||
|
reqres := NewReqRes(req)
|
||||||
|
reqres.Response = res // Set response
|
||||||
|
reqres.Done() // Release waiters
|
||||||
|
|
||||||
|
// Notify reqRes listener if set
|
||||||
|
if cb := reqres.GetCallback(); cb != nil {
|
||||||
|
cb(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Notify client listener if set
|
||||||
|
if cli.resCb != nil {
|
||||||
|
cli.resCb(reqres.Request, res)
|
||||||
|
}
|
||||||
|
return reqres
|
||||||
|
}
|
||||||
|
|
||||||
|
//----------------------------------------
|
||||||
|
|
||||||
|
func (cli *grpcClient) EchoSync(msg string) (res types.Result) {
|
||||||
|
r := cli.EchoAsync(msg).Response.GetEcho()
|
||||||
|
return types.NewResultOK([]byte(r.Message), LOG)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) FlushSync() error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) InfoSync() (res types.Result) {
|
||||||
|
r := cli.InfoAsync().Response.GetInfo()
|
||||||
|
return types.NewResultOK([]byte(r.Info), LOG)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) {
|
||||||
|
reqres := cli.SetOptionAsync(key, value)
|
||||||
|
if cli.err != nil {
|
||||||
|
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||||
|
}
|
||||||
|
resp := reqres.Response.GetSetOption()
|
||||||
|
return types.Result{Code: OK, Data: nil, Log: resp.Log}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) {
|
||||||
|
reqres := cli.AppendTxAsync(tx)
|
||||||
|
if cli.err != nil {
|
||||||
|
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||||
|
}
|
||||||
|
resp := reqres.Response.GetAppendTx()
|
||||||
|
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) {
|
||||||
|
reqres := cli.CheckTxAsync(tx)
|
||||||
|
if cli.err != nil {
|
||||||
|
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||||
|
}
|
||||||
|
resp := reqres.Response.GetCheckTx()
|
||||||
|
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) QuerySync(query []byte) (res types.Result) {
|
||||||
|
reqres := cli.QueryAsync(query)
|
||||||
|
if cli.err != nil {
|
||||||
|
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||||
|
}
|
||||||
|
resp := reqres.Response.GetQuery()
|
||||||
|
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) CommitSync() (res types.Result) {
|
||||||
|
reqres := cli.CommitAsync()
|
||||||
|
if cli.err != nil {
|
||||||
|
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||||
|
}
|
||||||
|
resp := reqres.Response.GetCommit()
|
||||||
|
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) InitChainSync(validators []*types.Validator) (err error) {
|
||||||
|
cli.InitChainAsync(validators)
|
||||||
|
if cli.err != nil {
|
||||||
|
return cli.err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) BeginBlockSync(height uint64) (err error) {
|
||||||
|
cli.BeginBlockAsync(height)
|
||||||
|
if cli.err != nil {
|
||||||
|
return cli.err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cli *grpcClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) {
|
||||||
|
reqres := cli.EndBlockAsync(height)
|
||||||
|
if cli.err != nil {
|
||||||
|
return nil, cli.err
|
||||||
|
}
|
||||||
|
return reqres.Response.GetEndBlock().Diffs, nil
|
||||||
|
}
|
@ -38,13 +38,13 @@ func (app *localClient) Stop() bool {
|
|||||||
|
|
||||||
func (app *localClient) FlushAsync() *ReqRes {
|
func (app *localClient) FlushAsync() *ReqRes {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
return newLocalReqRes(types.RequestFlush(), nil)
|
return newLocalReqRes(types.ToRequestFlush(), nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *localClient) EchoAsync(msg string) *ReqRes {
|
func (app *localClient) EchoAsync(msg string) *ReqRes {
|
||||||
return app.callback(
|
return app.callback(
|
||||||
types.RequestEcho(msg),
|
types.ToRequestEcho(msg),
|
||||||
types.ResponseEcho(msg),
|
types.ToResponseEcho(msg),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,8 +53,8 @@ func (app *localClient) InfoAsync() *ReqRes {
|
|||||||
info := app.Application.Info()
|
info := app.Application.Info()
|
||||||
app.mtx.Unlock()
|
app.mtx.Unlock()
|
||||||
return app.callback(
|
return app.callback(
|
||||||
types.RequestInfo(),
|
types.ToRequestInfo(),
|
||||||
types.ResponseInfo(info),
|
types.ToResponseInfo(info),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -63,8 +63,8 @@ func (app *localClient) SetOptionAsync(key string, value string) *ReqRes {
|
|||||||
log := app.Application.SetOption(key, value)
|
log := app.Application.SetOption(key, value)
|
||||||
app.mtx.Unlock()
|
app.mtx.Unlock()
|
||||||
return app.callback(
|
return app.callback(
|
||||||
types.RequestSetOption(key, value),
|
types.ToRequestSetOption(key, value),
|
||||||
types.ResponseSetOption(log),
|
types.ToResponseSetOption(log),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,8 +73,8 @@ func (app *localClient) AppendTxAsync(tx []byte) *ReqRes {
|
|||||||
res := app.Application.AppendTx(tx)
|
res := app.Application.AppendTx(tx)
|
||||||
app.mtx.Unlock()
|
app.mtx.Unlock()
|
||||||
return app.callback(
|
return app.callback(
|
||||||
types.RequestAppendTx(tx),
|
types.ToRequestAppendTx(tx),
|
||||||
types.ResponseAppendTx(res.Code, res.Data, res.Log),
|
types.ToResponseAppendTx(res.Code, res.Data, res.Log),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -83,8 +83,8 @@ func (app *localClient) CheckTxAsync(tx []byte) *ReqRes {
|
|||||||
res := app.Application.CheckTx(tx)
|
res := app.Application.CheckTx(tx)
|
||||||
app.mtx.Unlock()
|
app.mtx.Unlock()
|
||||||
return app.callback(
|
return app.callback(
|
||||||
types.RequestCheckTx(tx),
|
types.ToRequestCheckTx(tx),
|
||||||
types.ResponseCheckTx(res.Code, res.Data, res.Log),
|
types.ToResponseCheckTx(res.Code, res.Data, res.Log),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,8 +93,8 @@ func (app *localClient) QueryAsync(tx []byte) *ReqRes {
|
|||||||
res := app.Application.Query(tx)
|
res := app.Application.Query(tx)
|
||||||
app.mtx.Unlock()
|
app.mtx.Unlock()
|
||||||
return app.callback(
|
return app.callback(
|
||||||
types.RequestQuery(tx),
|
types.ToRequestQuery(tx),
|
||||||
types.ResponseQuery(res.Code, res.Data, res.Log),
|
types.ToResponseQuery(res.Code, res.Data, res.Log),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,8 +103,8 @@ func (app *localClient) CommitAsync() *ReqRes {
|
|||||||
res := app.Application.Commit()
|
res := app.Application.Commit()
|
||||||
app.mtx.Unlock()
|
app.mtx.Unlock()
|
||||||
return app.callback(
|
return app.callback(
|
||||||
types.RequestCommit(),
|
types.ToRequestCommit(),
|
||||||
types.ResponseCommit(res.Code, res.Data, res.Log),
|
types.ToResponseCommit(res.Code, res.Data, res.Log),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,8 +114,8 @@ func (app *localClient) InitChainAsync(validators []*types.Validator) *ReqRes {
|
|||||||
bcApp.InitChain(validators)
|
bcApp.InitChain(validators)
|
||||||
}
|
}
|
||||||
reqRes := app.callback(
|
reqRes := app.callback(
|
||||||
types.RequestInitChain(validators),
|
types.ToRequestInitChain(validators),
|
||||||
types.ResponseInitChain(),
|
types.ToResponseInitChain(),
|
||||||
)
|
)
|
||||||
app.mtx.Unlock()
|
app.mtx.Unlock()
|
||||||
return reqRes
|
return reqRes
|
||||||
@ -128,8 +128,8 @@ func (app *localClient) BeginBlockAsync(height uint64) *ReqRes {
|
|||||||
}
|
}
|
||||||
app.mtx.Unlock()
|
app.mtx.Unlock()
|
||||||
return app.callback(
|
return app.callback(
|
||||||
types.RequestBeginBlock(height),
|
types.ToRequestBeginBlock(height),
|
||||||
types.ResponseBeginBlock(),
|
types.ToResponseBeginBlock(),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -141,8 +141,8 @@ func (app *localClient) EndBlockAsync(height uint64) *ReqRes {
|
|||||||
}
|
}
|
||||||
app.mtx.Unlock()
|
app.mtx.Unlock()
|
||||||
return app.callback(
|
return app.callback(
|
||||||
types.RequestEndBlock(height),
|
types.ToRequestEndBlock(height),
|
||||||
types.ResponseEndBlock(validators),
|
types.ToResponseEndBlock(validators),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"reflect"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -13,6 +14,11 @@ import (
|
|||||||
"github.com/tendermint/tmsp/types"
|
"github.com/tendermint/tmsp/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
OK = types.CodeType_OK
|
||||||
|
LOG = ""
|
||||||
|
)
|
||||||
|
|
||||||
const reqQueueSize = 256 // TODO make configurable
|
const reqQueueSize = 256 // TODO make configurable
|
||||||
const maxResponseSize = 1048576 // 1MB TODO make configurable
|
const maxResponseSize = 1048576 // 1MB TODO make configurable
|
||||||
const flushThrottleMS = 20 // Don't wait longer than...
|
const flushThrottleMS = 20 // Don't wait longer than...
|
||||||
@ -36,7 +42,7 @@ type remoteClient struct {
|
|||||||
resCb func(*types.Request, *types.Response) // listens to all callbacks
|
resCb func(*types.Request, *types.Response) // listens to all callbacks
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewClient(addr string, mustConnect bool) (*remoteClient, error) {
|
func NewSocketClient(addr string, mustConnect bool) (*remoteClient, error) {
|
||||||
cli := &remoteClient{
|
cli := &remoteClient{
|
||||||
reqQueue: make(chan *ReqRes, reqQueueSize),
|
reqQueue: make(chan *ReqRes, reqQueueSize),
|
||||||
flushTimer: NewThrottleTimer("remoteClient", flushThrottleMS),
|
flushTimer: NewThrottleTimer("remoteClient", flushThrottleMS),
|
||||||
@ -48,39 +54,28 @@ func NewClient(addr string, mustConnect bool) (*remoteClient, error) {
|
|||||||
}
|
}
|
||||||
cli.QuitService = *NewQuitService(nil, "remoteClient", cli)
|
cli.QuitService = *NewQuitService(nil, "remoteClient", cli)
|
||||||
_, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
|
_, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
|
||||||
if mustConnect {
|
return cli, err
|
||||||
return nil, err
|
|
||||||
} else {
|
|
||||||
return cli, nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) OnStart() (err error) {
|
func (cli *remoteClient) OnStart() error {
|
||||||
cli.QuitService.OnStart()
|
cli.QuitService.OnStart()
|
||||||
doneCh := make(chan struct{})
|
RETRY_LOOP:
|
||||||
go func() {
|
for {
|
||||||
RETRY_LOOP:
|
conn, err := Connect(cli.addr)
|
||||||
for {
|
if err != nil {
|
||||||
conn, err_ := Connect(cli.addr)
|
if cli.mustConnect {
|
||||||
if err_ != nil {
|
return err
|
||||||
if cli.mustConnect {
|
} else {
|
||||||
err = err_ // OnStart() will return this.
|
fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr)
|
||||||
close(doneCh)
|
time.Sleep(time.Second * 3)
|
||||||
return
|
continue RETRY_LOOP
|
||||||
} else {
|
|
||||||
fmt.Printf("tmsp.remoteClient failed to connect to %v. Retrying...\n", cli.addr)
|
|
||||||
time.Sleep(time.Second * 3)
|
|
||||||
continue RETRY_LOOP
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
go cli.sendRequestsRoutine(conn)
|
|
||||||
go cli.recvResponseRoutine(conn)
|
|
||||||
close(doneCh) // OnStart() will return no error.
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}()
|
go cli.sendRequestsRoutine(conn)
|
||||||
<-doneCh
|
go cli.recvResponseRoutine(conn)
|
||||||
return // err
|
return err
|
||||||
|
}
|
||||||
|
return nil // never happens
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) OnStop() {
|
func (cli *remoteClient) OnStop() {
|
||||||
@ -122,7 +117,7 @@ func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) {
|
|||||||
select {
|
select {
|
||||||
case <-cli.flushTimer.Ch:
|
case <-cli.flushTimer.Ch:
|
||||||
select {
|
select {
|
||||||
case cli.reqQueue <- NewReqRes(types.RequestFlush()):
|
case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): // cant this block ?
|
||||||
default:
|
default:
|
||||||
// Probably will fill the buffer, or retry later.
|
// Probably will fill the buffer, or retry later.
|
||||||
}
|
}
|
||||||
@ -136,7 +131,7 @@ func (cli *remoteClient) sendRequestsRoutine(conn net.Conn) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
// log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
|
// log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
|
||||||
if reqres.Request.Type == types.MessageType_Flush {
|
if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
|
||||||
err = w.Flush()
|
err = w.Flush()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cli.StopForError(err)
|
cli.StopForError(err)
|
||||||
@ -156,10 +151,10 @@ func (cli *remoteClient) recvResponseRoutine(conn net.Conn) {
|
|||||||
cli.StopForError(err)
|
cli.StopForError(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
switch res.Type {
|
switch r := res.Value.(type) {
|
||||||
case types.MessageType_Exception:
|
case *types.Response_Exception:
|
||||||
// XXX After setting cli.err, release waiters (e.g. reqres.Done())
|
// XXX After setting cli.err, release waiters (e.g. reqres.Done())
|
||||||
cli.StopForError(errors.New(res.Error))
|
cli.StopForError(errors.New(r.Exception.Error))
|
||||||
default:
|
default:
|
||||||
// log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
|
// log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
|
||||||
err := cli.didRecvResponse(res)
|
err := cli.didRecvResponse(res)
|
||||||
@ -183,12 +178,12 @@ func (cli *remoteClient) didRecvResponse(res *types.Response) error {
|
|||||||
// Get the first ReqRes
|
// Get the first ReqRes
|
||||||
next := cli.reqSent.Front()
|
next := cli.reqSent.Front()
|
||||||
if next == nil {
|
if next == nil {
|
||||||
return fmt.Errorf("Unexpected result type %v when nothing expected", res.Type)
|
return fmt.Errorf("Unexpected result type %v when nothing expected", reflect.TypeOf(res.Value))
|
||||||
}
|
}
|
||||||
reqres := next.Value.(*ReqRes)
|
reqres := next.Value.(*ReqRes)
|
||||||
if !resMatchesReq(reqres.Request, res) {
|
if !resMatchesReq(reqres.Request, res) {
|
||||||
return fmt.Errorf("Unexpected result type %v when response to %v expected",
|
return fmt.Errorf("Unexpected result type %v when response to %v expected",
|
||||||
res.Type, reqres.Request.Type)
|
reflect.TypeOf(res.Value), reflect.TypeOf(reqres.Request.Value))
|
||||||
}
|
}
|
||||||
|
|
||||||
reqres.Response = res // Set response
|
reqres.Response = res // Set response
|
||||||
@ -211,128 +206,128 @@ func (cli *remoteClient) didRecvResponse(res *types.Response) error {
|
|||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
|
||||||
func (cli *remoteClient) EchoAsync(msg string) *ReqRes {
|
func (cli *remoteClient) EchoAsync(msg string) *ReqRes {
|
||||||
return cli.queueRequest(types.RequestEcho(msg))
|
return cli.queueRequest(types.ToRequestEcho(msg))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) FlushAsync() *ReqRes {
|
func (cli *remoteClient) FlushAsync() *ReqRes {
|
||||||
return cli.queueRequest(types.RequestFlush())
|
return cli.queueRequest(types.ToRequestFlush())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) InfoAsync() *ReqRes {
|
func (cli *remoteClient) InfoAsync() *ReqRes {
|
||||||
return cli.queueRequest(types.RequestInfo())
|
return cli.queueRequest(types.ToRequestInfo())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) SetOptionAsync(key string, value string) *ReqRes {
|
func (cli *remoteClient) SetOptionAsync(key string, value string) *ReqRes {
|
||||||
return cli.queueRequest(types.RequestSetOption(key, value))
|
return cli.queueRequest(types.ToRequestSetOption(key, value))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) AppendTxAsync(tx []byte) *ReqRes {
|
func (cli *remoteClient) AppendTxAsync(tx []byte) *ReqRes {
|
||||||
return cli.queueRequest(types.RequestAppendTx(tx))
|
return cli.queueRequest(types.ToRequestAppendTx(tx))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) CheckTxAsync(tx []byte) *ReqRes {
|
func (cli *remoteClient) CheckTxAsync(tx []byte) *ReqRes {
|
||||||
return cli.queueRequest(types.RequestCheckTx(tx))
|
return cli.queueRequest(types.ToRequestCheckTx(tx))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) QueryAsync(query []byte) *ReqRes {
|
func (cli *remoteClient) QueryAsync(query []byte) *ReqRes {
|
||||||
return cli.queueRequest(types.RequestQuery(query))
|
return cli.queueRequest(types.ToRequestQuery(query))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) CommitAsync() *ReqRes {
|
func (cli *remoteClient) CommitAsync() *ReqRes {
|
||||||
return cli.queueRequest(types.RequestCommit())
|
return cli.queueRequest(types.ToRequestCommit())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) InitChainAsync(validators []*types.Validator) *ReqRes {
|
func (cli *remoteClient) InitChainAsync(validators []*types.Validator) *ReqRes {
|
||||||
return cli.queueRequest(types.RequestInitChain(validators))
|
return cli.queueRequest(types.ToRequestInitChain(validators))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) BeginBlockAsync(height uint64) *ReqRes {
|
func (cli *remoteClient) BeginBlockAsync(height uint64) *ReqRes {
|
||||||
return cli.queueRequest(types.RequestBeginBlock(height))
|
return cli.queueRequest(types.ToRequestBeginBlock(height))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) EndBlockAsync(height uint64) *ReqRes {
|
func (cli *remoteClient) EndBlockAsync(height uint64) *ReqRes {
|
||||||
return cli.queueRequest(types.RequestEndBlock(height))
|
return cli.queueRequest(types.ToRequestEndBlock(height))
|
||||||
}
|
}
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
|
||||||
func (cli *remoteClient) EchoSync(msg string) (res types.Result) {
|
func (cli *remoteClient) EchoSync(msg string) (res types.Result) {
|
||||||
reqres := cli.queueRequest(types.RequestEcho(msg))
|
reqres := cli.queueRequest(types.ToRequestEcho(msg))
|
||||||
cli.FlushSync()
|
cli.FlushSync()
|
||||||
if cli.err != nil {
|
if cli.err != nil {
|
||||||
return types.ErrInternalError.SetLog(cli.err.Error())
|
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||||
}
|
}
|
||||||
resp := reqres.Response
|
resp := reqres.Response.GetEcho()
|
||||||
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
return types.Result{Code: OK, Data: []byte(resp.Message), Log: LOG}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) FlushSync() error {
|
func (cli *remoteClient) FlushSync() error {
|
||||||
cli.queueRequest(types.RequestFlush()).Wait()
|
cli.queueRequest(types.ToRequestFlush()).Wait()
|
||||||
return cli.err
|
return cli.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) InfoSync() (res types.Result) {
|
func (cli *remoteClient) InfoSync() (res types.Result) {
|
||||||
reqres := cli.queueRequest(types.RequestInfo())
|
reqres := cli.queueRequest(types.ToRequestInfo())
|
||||||
cli.FlushSync()
|
cli.FlushSync()
|
||||||
if cli.err != nil {
|
if cli.err != nil {
|
||||||
return types.ErrInternalError.SetLog(cli.err.Error())
|
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||||
}
|
}
|
||||||
resp := reqres.Response
|
resp := reqres.Response.GetInfo()
|
||||||
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
return types.Result{Code: OK, Data: []byte(resp.Info), Log: LOG}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Result) {
|
func (cli *remoteClient) SetOptionSync(key string, value string) (res types.Result) {
|
||||||
reqres := cli.queueRequest(types.RequestSetOption(key, value))
|
reqres := cli.queueRequest(types.ToRequestSetOption(key, value))
|
||||||
cli.FlushSync()
|
cli.FlushSync()
|
||||||
if cli.err != nil {
|
if cli.err != nil {
|
||||||
return types.ErrInternalError.SetLog(cli.err.Error())
|
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||||
}
|
}
|
||||||
resp := reqres.Response
|
resp := reqres.Response.GetSetOption()
|
||||||
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
return types.Result{Code: OK, Data: nil, Log: resp.Log}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) {
|
func (cli *remoteClient) AppendTxSync(tx []byte) (res types.Result) {
|
||||||
reqres := cli.queueRequest(types.RequestAppendTx(tx))
|
reqres := cli.queueRequest(types.ToRequestAppendTx(tx))
|
||||||
cli.FlushSync()
|
cli.FlushSync()
|
||||||
if cli.err != nil {
|
if cli.err != nil {
|
||||||
return types.ErrInternalError.SetLog(cli.err.Error())
|
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||||
}
|
}
|
||||||
resp := reqres.Response
|
resp := reqres.Response.GetAppendTx()
|
||||||
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) {
|
func (cli *remoteClient) CheckTxSync(tx []byte) (res types.Result) {
|
||||||
reqres := cli.queueRequest(types.RequestCheckTx(tx))
|
reqres := cli.queueRequest(types.ToRequestCheckTx(tx))
|
||||||
cli.FlushSync()
|
cli.FlushSync()
|
||||||
if cli.err != nil {
|
if cli.err != nil {
|
||||||
return types.ErrInternalError.SetLog(cli.err.Error())
|
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||||
}
|
}
|
||||||
resp := reqres.Response
|
resp := reqres.Response.GetCheckTx()
|
||||||
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) QuerySync(query []byte) (res types.Result) {
|
func (cli *remoteClient) QuerySync(query []byte) (res types.Result) {
|
||||||
reqres := cli.queueRequest(types.RequestQuery(query))
|
reqres := cli.queueRequest(types.ToRequestQuery(query))
|
||||||
cli.FlushSync()
|
cli.FlushSync()
|
||||||
if cli.err != nil {
|
if cli.err != nil {
|
||||||
return types.ErrInternalError.SetLog(cli.err.Error())
|
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||||
}
|
}
|
||||||
resp := reqres.Response
|
resp := reqres.Response.GetQuery()
|
||||||
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) CommitSync() (res types.Result) {
|
func (cli *remoteClient) CommitSync() (res types.Result) {
|
||||||
reqres := cli.queueRequest(types.RequestCommit())
|
reqres := cli.queueRequest(types.ToRequestCommit())
|
||||||
cli.FlushSync()
|
cli.FlushSync()
|
||||||
if cli.err != nil {
|
if cli.err != nil {
|
||||||
return types.ErrInternalError.SetLog(cli.err.Error())
|
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||||
}
|
}
|
||||||
resp := reqres.Response
|
resp := reqres.Response.GetCommit()
|
||||||
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error) {
|
func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error) {
|
||||||
cli.queueRequest(types.RequestInitChain(validators))
|
cli.queueRequest(types.ToRequestInitChain(validators))
|
||||||
cli.FlushSync()
|
cli.FlushSync()
|
||||||
if cli.err != nil {
|
if cli.err != nil {
|
||||||
return cli.err
|
return cli.err
|
||||||
@ -341,7 +336,7 @@ func (cli *remoteClient) InitChainSync(validators []*types.Validator) (err error
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) BeginBlockSync(height uint64) (err error) {
|
func (cli *remoteClient) BeginBlockSync(height uint64) (err error) {
|
||||||
cli.queueRequest(types.RequestBeginBlock(height))
|
cli.queueRequest(types.ToRequestBeginBlock(height))
|
||||||
cli.FlushSync()
|
cli.FlushSync()
|
||||||
if cli.err != nil {
|
if cli.err != nil {
|
||||||
return cli.err
|
return cli.err
|
||||||
@ -350,24 +345,25 @@ func (cli *remoteClient) BeginBlockSync(height uint64) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) {
|
func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) {
|
||||||
reqres := cli.queueRequest(types.RequestEndBlock(height))
|
reqres := cli.queueRequest(types.ToRequestEndBlock(height))
|
||||||
cli.FlushSync()
|
cli.FlushSync()
|
||||||
if cli.err != nil {
|
if cli.err != nil {
|
||||||
return nil, cli.err
|
return nil, cli.err
|
||||||
}
|
}
|
||||||
return reqres.Response.Validators, nil
|
return reqres.Response.GetEndBlock().Diffs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
|
||||||
func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes {
|
func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes {
|
||||||
reqres := NewReqRes(req)
|
reqres := NewReqRes(req)
|
||||||
|
|
||||||
// TODO: set cli.err if reqQueue times out
|
// TODO: set cli.err if reqQueue times out
|
||||||
cli.reqQueue <- reqres
|
cli.reqQueue <- reqres
|
||||||
|
|
||||||
// Maybe auto-flush, or unset auto-flush
|
// Maybe auto-flush, or unset auto-flush
|
||||||
switch req.Type {
|
switch req.Value.(type) {
|
||||||
case types.MessageType_Flush:
|
case *types.Request_Flush:
|
||||||
cli.flushTimer.Unset()
|
cli.flushTimer.Unset()
|
||||||
default:
|
default:
|
||||||
cli.flushTimer.Set()
|
cli.flushTimer.Set()
|
||||||
@ -379,5 +375,27 @@ func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes {
|
|||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
|
||||||
func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
|
func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
|
||||||
return req.Type == res.Type
|
switch req.Value.(type) {
|
||||||
|
case *types.Request_Echo:
|
||||||
|
_, ok = res.Value.(*types.Response_Echo)
|
||||||
|
case *types.Request_Flush:
|
||||||
|
_, ok = res.Value.(*types.Response_Flush)
|
||||||
|
case *types.Request_Info:
|
||||||
|
_, ok = res.Value.(*types.Response_Info)
|
||||||
|
case *types.Request_SetOption:
|
||||||
|
_, ok = res.Value.(*types.Response_SetOption)
|
||||||
|
case *types.Request_AppendTx:
|
||||||
|
_, ok = res.Value.(*types.Response_AppendTx)
|
||||||
|
case *types.Request_CheckTx:
|
||||||
|
_, ok = res.Value.(*types.Response_CheckTx)
|
||||||
|
case *types.Request_Commit:
|
||||||
|
_, ok = res.Value.(*types.Response_Commit)
|
||||||
|
case *types.Request_Query:
|
||||||
|
_, ok = res.Value.(*types.Response_Query)
|
||||||
|
case *types.Request_InitChain:
|
||||||
|
_, ok = res.Value.(*types.Response_InitChain)
|
||||||
|
case *types.Request_EndBlock:
|
||||||
|
_, ok = res.Value.(*types.Response_EndBlock)
|
||||||
|
}
|
||||||
|
return ok
|
||||||
}
|
}
|
||||||
|
@ -11,12 +11,13 @@ import (
|
|||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
addrPtr := flag.String("addr", "tcp://0.0.0.0:46658", "Listen address")
|
addrPtr := flag.String("addr", "tcp://0.0.0.0:46658", "Listen address")
|
||||||
|
tmspPtr := flag.String("tmsp", "socket", "TMSP server: socket | grpc")
|
||||||
serialPtr := flag.Bool("serial", false, "Enforce incrementing (serial) txs")
|
serialPtr := flag.Bool("serial", false, "Enforce incrementing (serial) txs")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
app := counter.NewCounterApplication(*serialPtr)
|
app := counter.NewCounterApplication(*serialPtr)
|
||||||
|
|
||||||
// Start the listener
|
// Start the listener
|
||||||
_, err := server.NewServer(*addrPtr, app)
|
_, err := server.NewServer(*addrPtr, *tmspPtr, app)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Exit(err.Error())
|
Exit(err.Error())
|
||||||
}
|
}
|
||||||
|
@ -11,10 +11,11 @@ import (
|
|||||||
func main() {
|
func main() {
|
||||||
|
|
||||||
addrPtr := flag.String("addr", "tcp://0.0.0.0:46658", "Listen address")
|
addrPtr := flag.String("addr", "tcp://0.0.0.0:46658", "Listen address")
|
||||||
|
tmspPtr := flag.String("tmsp", "socket", "socket | grpc")
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
// Start the listener
|
// Start the listener
|
||||||
_, err := server.NewServer(*addrPtr, dummy.NewDummyApplication())
|
_, err := server.NewServer(*addrPtr, *tmspPtr, dummy.NewDummyApplication())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Exit(err.Error())
|
Exit(err.Error())
|
||||||
}
|
}
|
||||||
|
@ -5,18 +5,18 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/codegangsta/cli"
|
"github.com/codegangsta/cli"
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
"github.com/tendermint/go-wire/expr"
|
"github.com/tendermint/go-wire/expr"
|
||||||
|
"github.com/tendermint/tmsp/client"
|
||||||
"github.com/tendermint/tmsp/types"
|
"github.com/tendermint/tmsp/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// connection is a global variable so it can be reused by the console
|
// client is a global variable so it can be reused by the console
|
||||||
var conn net.Conn
|
var client tmspcli.Client
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
app := cli.NewApp()
|
app := cli.NewApp()
|
||||||
@ -28,6 +28,11 @@ func main() {
|
|||||||
Value: "tcp://127.0.0.1:46658",
|
Value: "tcp://127.0.0.1:46658",
|
||||||
Usage: "address of application socket",
|
Usage: "address of application socket",
|
||||||
},
|
},
|
||||||
|
cli.StringFlag{
|
||||||
|
Name: "tmsp",
|
||||||
|
Value: "socket",
|
||||||
|
Usage: "socket or grpc",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
app.Commands = []cli.Command{
|
app.Commands = []cli.Command{
|
||||||
{
|
{
|
||||||
@ -103,9 +108,9 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func before(c *cli.Context) error {
|
func before(c *cli.Context) error {
|
||||||
if conn == nil {
|
if client == nil {
|
||||||
var err error
|
var err error
|
||||||
conn, err = Connect(c.GlobalString("address"))
|
client, err = tmspcli.NewClient(c.GlobalString("address"), c.GlobalString("tmsp"), false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Exit(err.Error())
|
Exit(err.Error())
|
||||||
}
|
}
|
||||||
@ -148,7 +153,9 @@ func cmdConsole(app *cli.App, c *cli.Context) error {
|
|||||||
|
|
||||||
args := []string{"tmsp"}
|
args := []string{"tmsp"}
|
||||||
args = append(args, strings.Split(string(line), " ")...)
|
args = append(args, strings.Split(string(line), " ")...)
|
||||||
app.Run(args)
|
if err := app.Run(args); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -159,21 +166,15 @@ func cmdEcho(c *cli.Context) error {
|
|||||||
if len(args) != 1 {
|
if len(args) != 1 {
|
||||||
return errors.New("Command echo takes 1 argument")
|
return errors.New("Command echo takes 1 argument")
|
||||||
}
|
}
|
||||||
res, err := makeRequest(conn, types.RequestEcho(args[0]))
|
res := client.EchoSync(args[0])
|
||||||
if err != nil {
|
printResponse(res, string(res.Data), false)
|
||||||
return err
|
|
||||||
}
|
|
||||||
printResponse(res, string(res.Data))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get some info from the application
|
// Get some info from the application
|
||||||
func cmdInfo(c *cli.Context) error {
|
func cmdInfo(c *cli.Context) error {
|
||||||
res, err := makeRequest(conn, types.RequestInfo())
|
res := client.InfoSync()
|
||||||
if err != nil {
|
printResponse(res, string(res.Data), false)
|
||||||
return err
|
|
||||||
}
|
|
||||||
printResponse(res, string(res.Data))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -183,11 +184,8 @@ func cmdSetOption(c *cli.Context) error {
|
|||||||
if len(args) != 2 {
|
if len(args) != 2 {
|
||||||
return errors.New("Command set_option takes 2 arguments (key, value)")
|
return errors.New("Command set_option takes 2 arguments (key, value)")
|
||||||
}
|
}
|
||||||
res, err := makeRequest(conn, types.RequestSetOption(args[0], args[1]))
|
res := client.SetOptionSync(args[0], args[1])
|
||||||
if err != nil {
|
printResponse(res, Fmt("%s=%s", args[0], args[1]), false)
|
||||||
return err
|
|
||||||
}
|
|
||||||
printResponse(res, Fmt("%s=%s", args[0], args[1]))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -203,11 +201,8 @@ func cmdAppendTx(c *cli.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := makeRequest(conn, types.RequestAppendTx(txBytes))
|
res := client.AppendTxSync(txBytes)
|
||||||
if err != nil {
|
printResponse(res, string(res.Data), true)
|
||||||
return err
|
|
||||||
}
|
|
||||||
printResponse(res, string(res.Data))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -223,21 +218,15 @@ func cmdCheckTx(c *cli.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := makeRequest(conn, types.RequestCheckTx(txBytes))
|
res := client.CheckTxSync(txBytes)
|
||||||
if err != nil {
|
printResponse(res, string(res.Data), true)
|
||||||
return err
|
|
||||||
}
|
|
||||||
printResponse(res, string(res.Data))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get application Merkle root hash
|
// Get application Merkle root hash
|
||||||
func cmdCommit(c *cli.Context) error {
|
func cmdCommit(c *cli.Context) error {
|
||||||
res, err := makeRequest(conn, types.RequestCommit())
|
res := client.CommitSync()
|
||||||
if err != nil {
|
printResponse(res, Fmt("%X", res.Data), false)
|
||||||
return err
|
|
||||||
}
|
|
||||||
printResponse(res, Fmt("%X", res.Data))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -253,24 +242,20 @@ func cmdQuery(c *cli.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := makeRequest(conn, types.RequestQuery(queryBytes))
|
res := client.QuerySync(queryBytes)
|
||||||
if err != nil {
|
printResponse(res, string(res.Data), true)
|
||||||
return err
|
|
||||||
}
|
|
||||||
printResponse(res, string(res.Data))
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
//--------------------------------------------------------------------------------
|
//--------------------------------------------------------------------------------
|
||||||
|
|
||||||
func printResponse(res *types.Response, s string) {
|
func printResponse(res types.Result, s string, printCode bool) {
|
||||||
switch res.Type {
|
if printCode {
|
||||||
case types.MessageType_AppendTx, types.MessageType_CheckTx, types.MessageType_Query:
|
|
||||||
fmt.Printf("-> code: %s\n", res.Code.String())
|
fmt.Printf("-> code: %s\n", res.Code.String())
|
||||||
}
|
}
|
||||||
if res.Error != "" {
|
/*if res.Error != "" {
|
||||||
fmt.Printf("-> error: %s\n", res.Error)
|
fmt.Printf("-> error: %s\n", res.Error)
|
||||||
}
|
}*/
|
||||||
if s != "" {
|
if s != "" {
|
||||||
fmt.Printf("-> data: {%s}\n", s)
|
fmt.Printf("-> data: {%s}\n", s)
|
||||||
}
|
}
|
||||||
@ -279,41 +264,3 @@ func printResponse(res *types.Response, s string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func responseString(res *types.Response) string {
|
|
||||||
return Fmt("type: %v\tdata: %v\tcode: %v", res.Type, res.Data, res.Code)
|
|
||||||
}
|
|
||||||
|
|
||||||
func makeRequest(conn net.Conn, req *types.Request) (*types.Response, error) {
|
|
||||||
|
|
||||||
// Write desired request
|
|
||||||
err := types.WriteMessage(req, conn)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write flush request
|
|
||||||
err = types.WriteMessage(types.RequestFlush(), conn)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read desired response
|
|
||||||
var res = &types.Response{}
|
|
||||||
err = types.ReadMessage(conn, res)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read flush response
|
|
||||||
var resFlush = &types.Response{}
|
|
||||||
err = types.ReadMessage(conn, resFlush)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if resFlush.Type != types.MessageType_Flush {
|
|
||||||
return nil, errors.New(Fmt("Expected types.MessageType_Flush but got %v instead", resFlush.Type))
|
|
||||||
}
|
|
||||||
|
|
||||||
return res, nil
|
|
||||||
}
|
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package dummy
|
package dummy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -14,7 +15,7 @@ func TestStream(t *testing.T) {
|
|||||||
numAppendTxs := 200000
|
numAppendTxs := 200000
|
||||||
|
|
||||||
// Start the listener
|
// Start the listener
|
||||||
server, err := server.NewServer("unix://test.sock", NewDummyApplication())
|
server, err := server.NewSocketServer("unix://test.sock", NewDummyApplication())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Exit(err.Error())
|
Exit(err.Error())
|
||||||
}
|
}
|
||||||
@ -39,11 +40,11 @@ func TestStream(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Process response
|
// Process response
|
||||||
switch res.Type {
|
switch r := res.Value.(type) {
|
||||||
case types.MessageType_AppendTx:
|
case *types.Response_AppendTx:
|
||||||
counter += 1
|
counter += 1
|
||||||
if res.Code != types.CodeType_OK {
|
if r.AppendTx.Code != types.CodeType_OK {
|
||||||
t.Error("AppendTx failed with ret_code", res.Code)
|
t.Error("AppendTx failed with ret_code", r.AppendTx.Code)
|
||||||
}
|
}
|
||||||
if counter > numAppendTxs {
|
if counter > numAppendTxs {
|
||||||
t.Fatal("Too many AppendTx responses")
|
t.Fatal("Too many AppendTx responses")
|
||||||
@ -55,10 +56,10 @@ func TestStream(t *testing.T) {
|
|||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
case types.MessageType_Flush:
|
case *types.Response_Flush:
|
||||||
// ignore
|
// ignore
|
||||||
default:
|
default:
|
||||||
t.Error("Unexpected response type", res.Type)
|
t.Error("Unexpected response type", reflect.TypeOf(res.Value))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -66,7 +67,7 @@ func TestStream(t *testing.T) {
|
|||||||
// Write requests
|
// Write requests
|
||||||
for counter := 0; counter < numAppendTxs; counter++ {
|
for counter := 0; counter < numAppendTxs; counter++ {
|
||||||
// Send request
|
// Send request
|
||||||
var req = types.RequestAppendTx([]byte("test"))
|
var req = types.ToRequestAppendTx([]byte("test"))
|
||||||
err := types.WriteMessage(req, conn)
|
err := types.WriteMessage(req, conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err.Error())
|
t.Fatal(err.Error())
|
||||||
@ -75,7 +76,7 @@ func TestStream(t *testing.T) {
|
|||||||
// Sometimes send flush messages
|
// Sometimes send flush messages
|
||||||
if counter%123 == 0 {
|
if counter%123 == 0 {
|
||||||
t.Log("flush")
|
t.Log("flush")
|
||||||
err := types.WriteMessage(types.RequestFlush(), conn)
|
err := types.WriteMessage(types.ToRequestFlush(), conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err.Error())
|
t.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
@ -83,7 +84,7 @@ func TestStream(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send final flush message
|
// Send final flush message
|
||||||
err = types.WriteMessage(types.RequestFlush(), conn)
|
err = types.WriteMessage(types.ToRequestFlush(), conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err.Error())
|
t.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,14 @@
|
|||||||
package nilapp
|
package nilapp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net"
|
||||||
|
"reflect"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
"github.com/tendermint/tmsp/server"
|
"github.com/tendermint/tmsp/server"
|
||||||
"github.com/tendermint/tmsp/types"
|
"github.com/tendermint/tmsp/types"
|
||||||
@ -14,7 +19,7 @@ func TestStream(t *testing.T) {
|
|||||||
numAppendTxs := 200000
|
numAppendTxs := 200000
|
||||||
|
|
||||||
// Start the listener
|
// Start the listener
|
||||||
server, err := server.NewServer("unix://test.sock", NewNilApplication())
|
server, err := server.NewSocketServer("unix://test.sock", NewNilApplication())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Exit(err.Error())
|
Exit(err.Error())
|
||||||
}
|
}
|
||||||
@ -39,11 +44,11 @@ func TestStream(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Process response
|
// Process response
|
||||||
switch res.Type {
|
switch r := res.Value.(type) {
|
||||||
case types.MessageType_AppendTx:
|
case *types.Response_AppendTx:
|
||||||
counter += 1
|
counter += 1
|
||||||
if res.Code != types.CodeType_OK {
|
if r.AppendTx.Code != types.CodeType_OK {
|
||||||
t.Error("AppendTx failed with ret_code", res.Code)
|
t.Error("AppendTx failed with ret_code", r.AppendTx.Code)
|
||||||
}
|
}
|
||||||
if counter > numAppendTxs {
|
if counter > numAppendTxs {
|
||||||
t.Fatal("Too many AppendTx responses")
|
t.Fatal("Too many AppendTx responses")
|
||||||
@ -55,10 +60,10 @@ func TestStream(t *testing.T) {
|
|||||||
close(done)
|
close(done)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
case types.MessageType_Flush:
|
case *types.Response_Flush:
|
||||||
// ignore
|
// ignore
|
||||||
default:
|
default:
|
||||||
t.Error("Unexpected response type", res.Type)
|
t.Error("Unexpected response type", reflect.TypeOf(res.Value))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -66,7 +71,7 @@ func TestStream(t *testing.T) {
|
|||||||
// Write requests
|
// Write requests
|
||||||
for counter := 0; counter < numAppendTxs; counter++ {
|
for counter := 0; counter < numAppendTxs; counter++ {
|
||||||
// Send request
|
// Send request
|
||||||
var req = types.RequestAppendTx([]byte("test"))
|
var req = types.ToRequestAppendTx([]byte("test"))
|
||||||
err := types.WriteMessage(req, conn)
|
err := types.WriteMessage(req, conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err.Error())
|
t.Fatal(err.Error())
|
||||||
@ -75,7 +80,7 @@ func TestStream(t *testing.T) {
|
|||||||
// Sometimes send flush messages
|
// Sometimes send flush messages
|
||||||
if counter%123 == 0 {
|
if counter%123 == 0 {
|
||||||
t.Log("flush")
|
t.Log("flush")
|
||||||
err := types.WriteMessage(types.RequestFlush(), conn)
|
err := types.WriteMessage(types.ToRequestFlush(), conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err.Error())
|
t.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
@ -83,10 +88,61 @@ func TestStream(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Send final flush message
|
// Send final flush message
|
||||||
err = types.WriteMessage(types.RequestFlush(), conn)
|
err = types.WriteMessage(types.ToRequestFlush(), conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err.Error())
|
t.Fatal(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
<-done
|
<-done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//-------------------------
|
||||||
|
// test grpc
|
||||||
|
|
||||||
|
func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
|
||||||
|
return Connect(addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGRPCSync(t *testing.T) {
|
||||||
|
|
||||||
|
numAppendTxs := 2000
|
||||||
|
|
||||||
|
// Start the listener
|
||||||
|
server, err := server.NewGRPCServer("unix://test.sock", types.NewGRPCApplication(NewNilApplication()))
|
||||||
|
if err != nil {
|
||||||
|
Exit(err.Error())
|
||||||
|
}
|
||||||
|
defer server.Stop()
|
||||||
|
|
||||||
|
// Connect to the socket
|
||||||
|
conn, err := grpc.Dial("unix://test.sock", grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
|
||||||
|
if err != nil {
|
||||||
|
Exit(err.Error())
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
client := types.NewTMSPApplicationClient(conn)
|
||||||
|
|
||||||
|
// Write requests
|
||||||
|
for counter := 0; counter < numAppendTxs; counter++ {
|
||||||
|
// Send request
|
||||||
|
response, err := client.AppendTx(context.Background(), &types.RequestAppendTx{[]byte("test")})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err.Error())
|
||||||
|
}
|
||||||
|
counter += 1
|
||||||
|
if response.Code != types.CodeType_OK {
|
||||||
|
t.Error("AppendTx failed with ret_code", response.Code)
|
||||||
|
}
|
||||||
|
if counter > numAppendTxs {
|
||||||
|
t.Fatal("Too many AppendTx responses")
|
||||||
|
}
|
||||||
|
t.Log("response", counter)
|
||||||
|
if counter == numAppendTxs {
|
||||||
|
go func() {
|
||||||
|
time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
55
server/grpc_server.go
Normal file
55
server/grpc_server.go
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
|
||||||
|
. "github.com/tendermint/go-common"
|
||||||
|
"github.com/tendermint/tmsp/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// var maxNumberConnections = 2
|
||||||
|
|
||||||
|
type GRPCServer struct {
|
||||||
|
QuitService
|
||||||
|
|
||||||
|
proto string
|
||||||
|
addr string
|
||||||
|
listener net.Listener
|
||||||
|
|
||||||
|
app types.TMSPApplicationServer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGRPCServer(protoAddr string, app types.TMSPApplicationServer) (Service, error) {
|
||||||
|
parts := strings.SplitN(protoAddr, "://", 2)
|
||||||
|
proto, addr := parts[0], parts[1]
|
||||||
|
s := &GRPCServer{
|
||||||
|
proto: proto,
|
||||||
|
addr: addr,
|
||||||
|
listener: nil,
|
||||||
|
app: app,
|
||||||
|
}
|
||||||
|
s.QuitService = *NewQuitService(nil, "TMSPServer", s)
|
||||||
|
_, err := s.Start() // Just start it
|
||||||
|
return s, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *GRPCServer) OnStart() error {
|
||||||
|
s.QuitService.OnStart()
|
||||||
|
ln, err := net.Listen(s.proto, s.addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.listener = ln
|
||||||
|
grpcServer := grpc.NewServer()
|
||||||
|
types.RegisterTMSPApplicationServer(grpcServer, s.app)
|
||||||
|
go grpcServer.Serve(ln)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *GRPCServer) OnStop() {
|
||||||
|
s.QuitService.OnStop()
|
||||||
|
s.listener.Close()
|
||||||
|
}
|
188
server/server.go
188
server/server.go
@ -1,190 +1,22 @@
|
|||||||
package server
|
package server
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
|
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
"github.com/tendermint/tmsp/types"
|
"github.com/tendermint/tmsp/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// var maxNumberConnections = 2
|
func NewServer(protoAddr, transport string, app types.Application) (Service, error) {
|
||||||
|
var s Service
|
||||||
type Server struct {
|
var err error
|
||||||
QuitService
|
switch transport {
|
||||||
|
case "socket":
|
||||||
proto string
|
s, err = NewSocketServer(protoAddr, app)
|
||||||
addr string
|
case "grpc":
|
||||||
listener net.Listener
|
s, err = NewGRPCServer(protoAddr, types.NewGRPCApplication(app))
|
||||||
|
default:
|
||||||
appMtx sync.Mutex
|
err = fmt.Errorf("Unknown server type %s", transport)
|
||||||
app types.Application
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewServer(protoAddr string, app types.Application) (*Server, error) {
|
|
||||||
parts := strings.SplitN(protoAddr, "://", 2)
|
|
||||||
proto, addr := parts[0], parts[1]
|
|
||||||
s := &Server{
|
|
||||||
proto: proto,
|
|
||||||
addr: addr,
|
|
||||||
listener: nil,
|
|
||||||
app: app,
|
|
||||||
}
|
}
|
||||||
s.QuitService = *NewQuitService(nil, "TMSPServer", s)
|
|
||||||
_, err := s.Start() // Just start it
|
|
||||||
return s, err
|
return s, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) OnStart() error {
|
|
||||||
s.QuitService.OnStart()
|
|
||||||
ln, err := net.Listen(s.proto, s.addr)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
s.listener = ln
|
|
||||||
go s.acceptConnectionsRoutine()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) OnStop() {
|
|
||||||
s.QuitService.OnStop()
|
|
||||||
s.listener.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) acceptConnectionsRoutine() {
|
|
||||||
// semaphore := make(chan struct{}, maxNumberConnections)
|
|
||||||
|
|
||||||
for {
|
|
||||||
// semaphore <- struct{}{}
|
|
||||||
|
|
||||||
// Accept a connection
|
|
||||||
fmt.Println("Waiting for new connection...")
|
|
||||||
conn, err := s.listener.Accept()
|
|
||||||
if err != nil {
|
|
||||||
if !s.IsRunning() {
|
|
||||||
return // Ignore error from listener closing.
|
|
||||||
}
|
|
||||||
Exit("Failed to accept connection: " + err.Error())
|
|
||||||
} else {
|
|
||||||
fmt.Println("Accepted a new connection")
|
|
||||||
}
|
|
||||||
|
|
||||||
closeConn := make(chan error, 2) // Push to signal connection closed
|
|
||||||
responses := make(chan *types.Response, 1000) // A channel to buffer responses
|
|
||||||
|
|
||||||
// Read requests from conn and deal with them
|
|
||||||
go s.handleRequests(closeConn, conn, responses)
|
|
||||||
// Pull responses from 'responses' and write them to conn.
|
|
||||||
go s.handleResponses(closeConn, responses, conn)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
// Wait until signal to close connection
|
|
||||||
errClose := <-closeConn
|
|
||||||
if errClose != nil {
|
|
||||||
fmt.Printf("Connection error: %v\n", errClose)
|
|
||||||
} else {
|
|
||||||
fmt.Println("Connection was closed.")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close the connection
|
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Error in closing connection: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// <-semaphore
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read requests from conn and deal with them
|
|
||||||
func (s *Server) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
|
|
||||||
var count int
|
|
||||||
var bufReader = bufio.NewReader(conn)
|
|
||||||
for {
|
|
||||||
|
|
||||||
var req = &types.Request{}
|
|
||||||
err := types.ReadMessage(bufReader, req)
|
|
||||||
if err != nil {
|
|
||||||
if err == io.EOF {
|
|
||||||
closeConn <- fmt.Errorf("Connection closed by client")
|
|
||||||
} else {
|
|
||||||
closeConn <- fmt.Errorf("Error in handleRequests: %v", err.Error())
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
s.appMtx.Lock()
|
|
||||||
count++
|
|
||||||
s.handleRequest(req, responses)
|
|
||||||
s.appMtx.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Server) handleRequest(req *types.Request, responses chan<- *types.Response) {
|
|
||||||
switch req.Type {
|
|
||||||
case types.MessageType_Echo:
|
|
||||||
responses <- types.ResponseEcho(string(req.Data))
|
|
||||||
case types.MessageType_Flush:
|
|
||||||
responses <- types.ResponseFlush()
|
|
||||||
case types.MessageType_Info:
|
|
||||||
data := s.app.Info()
|
|
||||||
responses <- types.ResponseInfo(data)
|
|
||||||
case types.MessageType_SetOption:
|
|
||||||
logStr := s.app.SetOption(req.Key, req.Value)
|
|
||||||
responses <- types.ResponseSetOption(logStr)
|
|
||||||
case types.MessageType_AppendTx:
|
|
||||||
res := s.app.AppendTx(req.Data)
|
|
||||||
responses <- types.ResponseAppendTx(res.Code, res.Data, res.Log)
|
|
||||||
case types.MessageType_CheckTx:
|
|
||||||
res := s.app.CheckTx(req.Data)
|
|
||||||
responses <- types.ResponseCheckTx(res.Code, res.Data, res.Log)
|
|
||||||
case types.MessageType_Commit:
|
|
||||||
res := s.app.Commit()
|
|
||||||
responses <- types.ResponseCommit(res.Code, res.Data, res.Log)
|
|
||||||
case types.MessageType_Query:
|
|
||||||
res := s.app.Query(req.Data)
|
|
||||||
responses <- types.ResponseQuery(res.Code, res.Data, res.Log)
|
|
||||||
case types.MessageType_InitChain:
|
|
||||||
if app, ok := s.app.(types.BlockchainAware); ok {
|
|
||||||
app.InitChain(req.Validators)
|
|
||||||
responses <- types.ResponseInitChain()
|
|
||||||
} else {
|
|
||||||
responses <- types.ResponseInitChain()
|
|
||||||
}
|
|
||||||
case types.MessageType_EndBlock:
|
|
||||||
if app, ok := s.app.(types.BlockchainAware); ok {
|
|
||||||
validators := app.EndBlock(req.Height)
|
|
||||||
responses <- types.ResponseEndBlock(validators)
|
|
||||||
} else {
|
|
||||||
responses <- types.ResponseEndBlock(nil)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
responses <- types.ResponseException("Unknown request")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pull responses from 'responses' and write them to conn.
|
|
||||||
func (s *Server) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
|
|
||||||
var count int
|
|
||||||
var bufWriter = bufio.NewWriter(conn)
|
|
||||||
for {
|
|
||||||
var res = <-responses
|
|
||||||
err := types.WriteMessage(res, bufWriter)
|
|
||||||
if err != nil {
|
|
||||||
closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if res.Type == types.MessageType_Flush {
|
|
||||||
err = bufWriter.Flush()
|
|
||||||
if err != nil {
|
|
||||||
closeConn <- fmt.Errorf("Error in handleResponses: %v", err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
count++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
191
server/socket_server.go
Normal file
191
server/socket_server.go
Normal file
@ -0,0 +1,191 @@
|
|||||||
|
package server
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
. "github.com/tendermint/go-common"
|
||||||
|
"github.com/tendermint/tmsp/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
// var maxNumberConnections = 2
|
||||||
|
|
||||||
|
type SocketServer struct {
|
||||||
|
QuitService
|
||||||
|
|
||||||
|
proto string
|
||||||
|
addr string
|
||||||
|
listener net.Listener
|
||||||
|
|
||||||
|
appMtx sync.Mutex
|
||||||
|
app types.Application
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSocketServer(protoAddr string, app types.Application) (Service, error) {
|
||||||
|
parts := strings.SplitN(protoAddr, "://", 2)
|
||||||
|
proto, addr := parts[0], parts[1]
|
||||||
|
s := &SocketServer{
|
||||||
|
proto: proto,
|
||||||
|
addr: addr,
|
||||||
|
listener: nil,
|
||||||
|
app: app,
|
||||||
|
}
|
||||||
|
s.QuitService = *NewQuitService(nil, "TMSPServer", s)
|
||||||
|
_, err := s.Start() // Just start it
|
||||||
|
return s, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SocketServer) OnStart() error {
|
||||||
|
s.QuitService.OnStart()
|
||||||
|
ln, err := net.Listen(s.proto, s.addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
s.listener = ln
|
||||||
|
go s.acceptConnectionsRoutine()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SocketServer) OnStop() {
|
||||||
|
s.QuitService.OnStop()
|
||||||
|
s.listener.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SocketServer) acceptConnectionsRoutine() {
|
||||||
|
// semaphore := make(chan struct{}, maxNumberConnections)
|
||||||
|
|
||||||
|
for {
|
||||||
|
// semaphore <- struct{}{}
|
||||||
|
|
||||||
|
// Accept a connection
|
||||||
|
fmt.Println("Waiting for new connection...")
|
||||||
|
conn, err := s.listener.Accept()
|
||||||
|
if err != nil {
|
||||||
|
if !s.IsRunning() {
|
||||||
|
return // Ignore error from listener closing.
|
||||||
|
}
|
||||||
|
Exit("Failed to accept connection: " + err.Error())
|
||||||
|
} else {
|
||||||
|
fmt.Println("Accepted a new connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
closeConn := make(chan error, 2) // Push to signal connection closed
|
||||||
|
responses := make(chan *types.Response, 1000) // A channel to buffer responses
|
||||||
|
|
||||||
|
// Read requests from conn and deal with them
|
||||||
|
go s.handleRequests(closeConn, conn, responses)
|
||||||
|
// Pull responses from 'responses' and write them to conn.
|
||||||
|
go s.handleResponses(closeConn, responses, conn)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
// Wait until signal to close connection
|
||||||
|
errClose := <-closeConn
|
||||||
|
if errClose != nil {
|
||||||
|
fmt.Printf("Connection error: %v\n", errClose)
|
||||||
|
} else {
|
||||||
|
fmt.Println("Connection was closed.")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close the connection
|
||||||
|
err := conn.Close()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Error in closing connection: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// <-semaphore
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read requests from conn and deal with them
|
||||||
|
func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
|
||||||
|
var count int
|
||||||
|
var bufReader = bufio.NewReader(conn)
|
||||||
|
for {
|
||||||
|
|
||||||
|
var req = &types.Request{}
|
||||||
|
err := types.ReadMessage(bufReader, req)
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
closeConn <- fmt.Errorf("Connection closed by client")
|
||||||
|
} else {
|
||||||
|
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.appMtx.Lock()
|
||||||
|
count++
|
||||||
|
s.handleRequest(req, responses)
|
||||||
|
s.appMtx.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
|
||||||
|
switch r := req.Value.(type) {
|
||||||
|
case *types.Request_Echo:
|
||||||
|
responses <- types.ToResponseEcho(r.Echo.Message)
|
||||||
|
case *types.Request_Flush:
|
||||||
|
responses <- types.ToResponseFlush()
|
||||||
|
case *types.Request_Info:
|
||||||
|
data := s.app.Info()
|
||||||
|
responses <- types.ToResponseInfo(data)
|
||||||
|
case *types.Request_SetOption:
|
||||||
|
so := r.SetOption
|
||||||
|
logStr := s.app.SetOption(so.Key, so.Value)
|
||||||
|
responses <- types.ToResponseSetOption(logStr)
|
||||||
|
case *types.Request_AppendTx:
|
||||||
|
res := s.app.AppendTx(r.AppendTx.Tx)
|
||||||
|
responses <- types.ToResponseAppendTx(res.Code, res.Data, res.Log)
|
||||||
|
case *types.Request_CheckTx:
|
||||||
|
res := s.app.CheckTx(r.CheckTx.Tx)
|
||||||
|
responses <- types.ToResponseCheckTx(res.Code, res.Data, res.Log)
|
||||||
|
case *types.Request_Commit:
|
||||||
|
res := s.app.Commit()
|
||||||
|
responses <- types.ToResponseCommit(res.Code, res.Data, res.Log)
|
||||||
|
case *types.Request_Query:
|
||||||
|
res := s.app.Query(r.Query.Query)
|
||||||
|
responses <- types.ToResponseQuery(res.Code, res.Data, res.Log)
|
||||||
|
case *types.Request_InitChain:
|
||||||
|
if app, ok := s.app.(types.BlockchainAware); ok {
|
||||||
|
app.InitChain(r.InitChain.Validators)
|
||||||
|
responses <- types.ToResponseInitChain()
|
||||||
|
} else {
|
||||||
|
responses <- types.ToResponseInitChain()
|
||||||
|
}
|
||||||
|
case *types.Request_EndBlock:
|
||||||
|
if app, ok := s.app.(types.BlockchainAware); ok {
|
||||||
|
validators := app.EndBlock(r.EndBlock.Height)
|
||||||
|
responses <- types.ToResponseEndBlock(validators)
|
||||||
|
} else {
|
||||||
|
responses <- types.ToResponseEndBlock(nil)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
responses <- types.ToResponseException("Unknown request")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Pull responses from 'responses' and write them to conn.
|
||||||
|
func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
|
||||||
|
var count int
|
||||||
|
var bufWriter = bufio.NewWriter(conn)
|
||||||
|
for {
|
||||||
|
var res = <-responses
|
||||||
|
err := types.WriteMessage(res, bufWriter)
|
||||||
|
if err != nil {
|
||||||
|
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, ok := res.Value.(*types.Response_Flush); ok {
|
||||||
|
err = bufWriter.Flush()
|
||||||
|
if err != nil {
|
||||||
|
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
@ -36,7 +36,7 @@ func main() {
|
|||||||
counter := 0
|
counter := 0
|
||||||
for i := 0; ; i++ {
|
for i := 0; ; i++ {
|
||||||
var bufWriter = bufio.NewWriter(conn)
|
var bufWriter = bufio.NewWriter(conn)
|
||||||
var req = types.RequestEcho("foobar")
|
var req = types.ToRequestEcho("foobar")
|
||||||
|
|
||||||
err := types.WriteMessage(req, bufWriter)
|
err := types.WriteMessage(req, bufWriter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"reflect"
|
||||||
//"encoding/hex"
|
//"encoding/hex"
|
||||||
|
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
@ -21,7 +22,7 @@ func main() {
|
|||||||
// Make a bunch of requests
|
// Make a bunch of requests
|
||||||
counter := 0
|
counter := 0
|
||||||
for i := 0; ; i++ {
|
for i := 0; ; i++ {
|
||||||
req := types.RequestEcho("foobar")
|
req := types.ToRequestEcho("foobar")
|
||||||
_, err := makeRequest(conn, req)
|
_, err := makeRequest(conn, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Exit(err.Error())
|
Exit(err.Error())
|
||||||
@ -41,7 +42,7 @@ func makeRequest(conn net.Conn, req *types.Request) (*types.Response, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err = types.WriteMessage(types.RequestFlush(), bufWriter)
|
err = types.WriteMessage(types.ToRequestFlush(), bufWriter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -61,8 +62,8 @@ func makeRequest(conn net.Conn, req *types.Request) (*types.Response, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if resFlush.Type != types.MessageType_Flush {
|
if _, ok := resFlush.Value.(*types.Response_Flush); !ok {
|
||||||
return nil, errors.New(Fmt("Expected flush response but got something else: %v", resFlush.Type))
|
return nil, errors.New(Fmt("Expected flush response but got something else: %v", reflect.TypeOf(resFlush)))
|
||||||
}
|
}
|
||||||
|
|
||||||
return res, nil
|
return res, nil
|
||||||
|
@ -5,5 +5,8 @@ cd $ROOT
|
|||||||
# test golang counter
|
# test golang counter
|
||||||
COUNTER_APP="counter" go run $ROOT/tests/test_counter.go
|
COUNTER_APP="counter" go run $ROOT/tests/test_counter.go
|
||||||
|
|
||||||
|
# test golang counter via grpc
|
||||||
|
COUNTER_APP="counter -tmsp=grpc" go run $ROOT/tests/test_counter.go -tmsp=grpc
|
||||||
|
|
||||||
# test nodejs counter
|
# test nodejs counter
|
||||||
COUNTER_APP="node ../js-tmsp/example/app.js" go run $ROOT/tests/test_counter.go
|
COUNTER_APP="node ../js-tmsp/example/app.js" go run $ROOT/tests/test_counter.go
|
||||||
|
@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
@ -12,7 +13,10 @@ import (
|
|||||||
"github.com/tendermint/tmsp/types"
|
"github.com/tendermint/tmsp/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var tmspPtr = flag.String("tmsp", "socket", "socket or grpc")
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
flag.Parse()
|
||||||
|
|
||||||
// Run tests
|
// Run tests
|
||||||
testBasic()
|
testBasic()
|
||||||
@ -70,7 +74,7 @@ func startApp() *process.Process {
|
|||||||
|
|
||||||
func startClient() tmspcli.Client {
|
func startClient() tmspcli.Client {
|
||||||
// Start client
|
// Start client
|
||||||
client, err := tmspcli.NewClient("tcp://127.0.0.1:46658", true)
|
client, err := tmspcli.NewClient("tcp://127.0.0.1:46658", *tmspPtr, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic("connecting to counter_app: " + err.Error())
|
panic("connecting to counter_app: " + err.Error())
|
||||||
}
|
}
|
||||||
@ -100,9 +104,6 @@ func commit(client tmspcli.Client, hashExp []byte) {
|
|||||||
func appendTx(client tmspcli.Client, txBytes []byte, codeExp types.CodeType, dataExp []byte) {
|
func appendTx(client tmspcli.Client, txBytes []byte, codeExp types.CodeType, dataExp []byte) {
|
||||||
res := client.AppendTxSync(txBytes)
|
res := client.AppendTxSync(txBytes)
|
||||||
code, data, log := res.Code, res.Data, res.Log
|
code, data, log := res.Code, res.Data, res.Log
|
||||||
if res.IsErr() {
|
|
||||||
panic(Fmt("appending tx %X: %v\nlog: %v", txBytes, log))
|
|
||||||
}
|
|
||||||
if code != codeExp {
|
if code != codeExp {
|
||||||
panic(Fmt("AppendTx response code was unexpected. Got %v expected %v. Log: %v",
|
panic(Fmt("AppendTx response code was unexpected. Got %v expected %v. Log: %v",
|
||||||
code, codeExp, log))
|
code, codeExp, log))
|
||||||
|
@ -1,5 +1,9 @@
|
|||||||
package types
|
package types
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "golang.org/x/net/context"
|
||||||
|
)
|
||||||
|
|
||||||
// Applications
|
// Applications
|
||||||
type Application interface {
|
type Application interface {
|
||||||
|
|
||||||
@ -36,3 +40,70 @@ type BlockchainAware interface {
|
|||||||
// diffs: changed validators from app to TendermintCore
|
// diffs: changed validators from app to TendermintCore
|
||||||
EndBlock(height uint64) (diffs []*Validator)
|
EndBlock(height uint64) (diffs []*Validator)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//------------------------------------
|
||||||
|
type GRPCApplication struct {
|
||||||
|
app Application
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewGRPCApplication(app Application) *GRPCApplication {
|
||||||
|
return &GRPCApplication{app}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *GRPCApplication) Echo(ctx context.Context, req *RequestEcho) (*ResponseEcho, error) {
|
||||||
|
return &ResponseEcho{req.Message}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *GRPCApplication) Flush(ctx context.Context, req *RequestFlush) (*ResponseFlush, error) {
|
||||||
|
return &ResponseFlush{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *GRPCApplication) Info(ctx context.Context, req *RequestInfo) (*ResponseInfo, error) {
|
||||||
|
return &ResponseInfo{app.app.Info()}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *GRPCApplication) SetOption(ctx context.Context, req *RequestSetOption) (*ResponseSetOption, error) {
|
||||||
|
return &ResponseSetOption{app.app.SetOption(req.Key, req.Value)}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *GRPCApplication) AppendTx(ctx context.Context, req *RequestAppendTx) (*ResponseAppendTx, error) {
|
||||||
|
r := app.app.AppendTx(req.Tx)
|
||||||
|
return &ResponseAppendTx{r.Code, r.Data, r.Log}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *GRPCApplication) CheckTx(ctx context.Context, req *RequestCheckTx) (*ResponseCheckTx, error) {
|
||||||
|
r := app.app.CheckTx(req.Tx)
|
||||||
|
return &ResponseCheckTx{r.Code, r.Data, r.Log}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *GRPCApplication) Query(ctx context.Context, req *RequestQuery) (*ResponseQuery, error) {
|
||||||
|
r := app.app.Query(req.Query)
|
||||||
|
return &ResponseQuery{r.Code, r.Data, r.Log}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *GRPCApplication) Commit(ctx context.Context, req *RequestCommit) (*ResponseCommit, error) {
|
||||||
|
r := app.app.Commit()
|
||||||
|
return &ResponseCommit{r.Code, r.Data, r.Log}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *GRPCApplication) InitChain(ctx context.Context, req *RequestInitChain) (*ResponseInitChain, error) {
|
||||||
|
if chainAware, ok := app.app.(BlockchainAware); ok {
|
||||||
|
chainAware.InitChain(req.Validators)
|
||||||
|
}
|
||||||
|
return &ResponseInitChain{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *GRPCApplication) BeginBlock(ctx context.Context, req *RequestBeginBlock) (*ResponseBeginBlock, error) {
|
||||||
|
if chainAware, ok := app.app.(BlockchainAware); ok {
|
||||||
|
chainAware.BeginBlock(req.Height)
|
||||||
|
}
|
||||||
|
return &ResponseBeginBlock{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (app *GRPCApplication) EndBlock(ctx context.Context, req *RequestEndBlock) (*ResponseEndBlock, error) {
|
||||||
|
if chainAware, ok := app.app.(BlockchainAware); ok {
|
||||||
|
diffs := chainAware.EndBlock(req.Height)
|
||||||
|
return &ResponseEndBlock{diffs}, nil
|
||||||
|
}
|
||||||
|
return &ResponseEndBlock{}, nil
|
||||||
|
}
|
||||||
|
@ -7,169 +7,143 @@ import (
|
|||||||
"github.com/tendermint/go-wire"
|
"github.com/tendermint/go-wire"
|
||||||
)
|
)
|
||||||
|
|
||||||
func RequestEcho(message string) *Request {
|
func ToRequestEcho(message string) *Request {
|
||||||
return &Request{
|
return &Request{
|
||||||
Type: MessageType_Echo,
|
Value: &Request_Echo{&RequestEcho{message}},
|
||||||
Data: []byte(message),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RequestFlush() *Request {
|
func ToRequestFlush() *Request {
|
||||||
return &Request{
|
return &Request{
|
||||||
Type: MessageType_Flush,
|
Value: &Request_Flush{&RequestFlush{}},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RequestInfo() *Request {
|
func ToRequestInfo() *Request {
|
||||||
return &Request{
|
return &Request{
|
||||||
Type: MessageType_Info,
|
Value: &Request_Info{&RequestInfo{}},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RequestSetOption(key string, value string) *Request {
|
func ToRequestSetOption(key string, value string) *Request {
|
||||||
return &Request{
|
return &Request{
|
||||||
Type: MessageType_SetOption,
|
Value: &Request_SetOption{&RequestSetOption{key, value}},
|
||||||
Key: key,
|
|
||||||
Value: value,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RequestAppendTx(txBytes []byte) *Request {
|
func ToRequestAppendTx(txBytes []byte) *Request {
|
||||||
return &Request{
|
return &Request{
|
||||||
Type: MessageType_AppendTx,
|
Value: &Request_AppendTx{&RequestAppendTx{txBytes}},
|
||||||
Data: txBytes,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RequestCheckTx(txBytes []byte) *Request {
|
func ToRequestCheckTx(txBytes []byte) *Request {
|
||||||
return &Request{
|
return &Request{
|
||||||
Type: MessageType_CheckTx,
|
Value: &Request_CheckTx{&RequestCheckTx{txBytes}},
|
||||||
Data: txBytes,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RequestCommit() *Request {
|
func ToRequestCommit() *Request {
|
||||||
return &Request{
|
return &Request{
|
||||||
Type: MessageType_Commit,
|
Value: &Request_Commit{&RequestCommit{}},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RequestQuery(queryBytes []byte) *Request {
|
func ToRequestQuery(queryBytes []byte) *Request {
|
||||||
return &Request{
|
return &Request{
|
||||||
Type: MessageType_Query,
|
Value: &Request_Query{&RequestQuery{queryBytes}},
|
||||||
Data: queryBytes,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RequestInitChain(validators []*Validator) *Request {
|
func ToRequestInitChain(validators []*Validator) *Request {
|
||||||
return &Request{
|
return &Request{
|
||||||
Type: MessageType_InitChain,
|
Value: &Request_InitChain{&RequestInitChain{validators}},
|
||||||
Validators: validators,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RequestBeginBlock(height uint64) *Request {
|
func ToRequestBeginBlock(height uint64) *Request {
|
||||||
return &Request{
|
return &Request{
|
||||||
Type: MessageType_BeginBlock,
|
Value: &Request_BeginBlock{&RequestBeginBlock{height}},
|
||||||
Height: height,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func RequestEndBlock(height uint64) *Request {
|
func ToRequestEndBlock(height uint64) *Request {
|
||||||
return &Request{
|
return &Request{
|
||||||
Type: MessageType_EndBlock,
|
Value: &Request_EndBlock{&RequestEndBlock{height}},
|
||||||
Height: height,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
|
|
||||||
func ResponseException(errStr string) *Response {
|
func ToResponseException(errStr string) *Response {
|
||||||
return &Response{
|
return &Response{
|
||||||
Type: MessageType_Exception,
|
Value: &Response_Exception{&ResponseException{errStr}},
|
||||||
Error: errStr,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResponseEcho(message string) *Response {
|
func ToResponseEcho(message string) *Response {
|
||||||
return &Response{
|
return &Response{
|
||||||
Type: MessageType_Echo,
|
Value: &Response_Echo{&ResponseEcho{message}},
|
||||||
Data: []byte(message),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResponseFlush() *Response {
|
func ToResponseFlush() *Response {
|
||||||
return &Response{
|
return &Response{
|
||||||
Type: MessageType_Flush,
|
Value: &Response_Flush{&ResponseFlush{}},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResponseInfo(info string) *Response {
|
func ToResponseInfo(info string) *Response {
|
||||||
return &Response{
|
return &Response{
|
||||||
Type: MessageType_Info,
|
Value: &Response_Info{&ResponseInfo{info}},
|
||||||
Data: []byte(info),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResponseSetOption(log string) *Response {
|
func ToResponseSetOption(log string) *Response {
|
||||||
return &Response{
|
return &Response{
|
||||||
Type: MessageType_SetOption,
|
Value: &Response_SetOption{&ResponseSetOption{log}},
|
||||||
Log: log,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResponseAppendTx(code CodeType, data []byte, log string) *Response {
|
func ToResponseAppendTx(code CodeType, data []byte, log string) *Response {
|
||||||
return &Response{
|
return &Response{
|
||||||
Type: MessageType_AppendTx,
|
Value: &Response_AppendTx{&ResponseAppendTx{code, data, log}},
|
||||||
Code: code,
|
|
||||||
Data: data,
|
|
||||||
Log: log,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResponseCheckTx(code CodeType, data []byte, log string) *Response {
|
func ToResponseCheckTx(code CodeType, data []byte, log string) *Response {
|
||||||
return &Response{
|
return &Response{
|
||||||
Type: MessageType_CheckTx,
|
Value: &Response_CheckTx{&ResponseCheckTx{code, data, log}},
|
||||||
Code: code,
|
|
||||||
Data: data,
|
|
||||||
Log: log,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResponseCommit(code CodeType, data []byte, log string) *Response {
|
func ToResponseCommit(code CodeType, data []byte, log string) *Response {
|
||||||
return &Response{
|
return &Response{
|
||||||
Type: MessageType_Commit,
|
Value: &Response_Commit{&ResponseCommit{code, data, log}},
|
||||||
Code: code,
|
|
||||||
Data: data,
|
|
||||||
Log: log,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResponseQuery(code CodeType, data []byte, log string) *Response {
|
func ToResponseQuery(code CodeType, data []byte, log string) *Response {
|
||||||
return &Response{
|
return &Response{
|
||||||
Type: MessageType_Query,
|
Value: &Response_Query{&ResponseQuery{code, data, log}},
|
||||||
Code: code,
|
|
||||||
Data: data,
|
|
||||||
Log: log,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResponseInitChain() *Response {
|
func ToResponseInitChain() *Response {
|
||||||
return &Response{
|
return &Response{
|
||||||
Type: MessageType_InitChain,
|
Value: &Response_InitChain{&ResponseInitChain{}},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResponseBeginBlock() *Response {
|
func ToResponseBeginBlock() *Response {
|
||||||
return &Response{
|
return &Response{
|
||||||
Type: MessageType_BeginBlock,
|
Value: &Response_BeginBlock{&ResponseBeginBlock{}},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ResponseEndBlock(validators []*Validator) *Response {
|
func ToResponseEndBlock(validators []*Validator) *Response {
|
||||||
return &Response{
|
return &Response{
|
||||||
Type: MessageType_EndBlock,
|
Value: &Response_EndBlock{&ResponseEndBlock{validators}},
|
||||||
Validators: validators,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
1648
types/types.pb.go
1648
types/types.pb.go
File diff suppressed because it is too large
Load Diff
@ -6,6 +6,10 @@ package types;
|
|||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
// Message types
|
// Message types
|
||||||
|
|
||||||
|
// Not being used
|
||||||
|
// Could be added to request/response
|
||||||
|
// so we don't have to type switch
|
||||||
|
// (would be twice as fast, but we're talking about 15ns)
|
||||||
enum MessageType {
|
enum MessageType {
|
||||||
NullMessage = 0x00;
|
NullMessage = 0x00;
|
||||||
|
|
||||||
@ -70,24 +74,136 @@ enum CodeType {
|
|||||||
// Request types
|
// Request types
|
||||||
|
|
||||||
message Request {
|
message Request {
|
||||||
MessageType type = 1;
|
oneof value{
|
||||||
bytes data = 2;
|
RequestEcho echo = 1;
|
||||||
string key = 3;
|
RequestFlush flush = 2;
|
||||||
string value = 4;
|
RequestInfo info = 3;
|
||||||
repeated Validator validators = 5;
|
RequestSetOption set_option = 4;
|
||||||
uint64 height = 6;
|
RequestAppendTx append_tx = 5;
|
||||||
|
RequestCheckTx check_tx = 6;
|
||||||
|
RequestCommit commit = 7;
|
||||||
|
RequestQuery query = 8;
|
||||||
|
RequestInitChain init_chain = 9;
|
||||||
|
RequestBeginBlock begin_block = 10;
|
||||||
|
RequestEndBlock end_block = 11;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message RequestEcho {
|
||||||
|
string message = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RequestFlush {
|
||||||
|
}
|
||||||
|
|
||||||
|
message RequestInfo {
|
||||||
|
}
|
||||||
|
|
||||||
|
message RequestSetOption{
|
||||||
|
string key = 1;
|
||||||
|
string value = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RequestAppendTx{
|
||||||
|
bytes tx = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RequestCheckTx{
|
||||||
|
bytes tx = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RequestQuery{
|
||||||
|
bytes query = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RequestCommit{
|
||||||
|
}
|
||||||
|
|
||||||
|
message RequestInitChain{
|
||||||
|
repeated Validator validators = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RequestBeginBlock{
|
||||||
|
uint64 height = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message RequestEndBlock{
|
||||||
|
uint64 height = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
// Response types
|
// Response types
|
||||||
|
|
||||||
|
|
||||||
message Response {
|
message Response {
|
||||||
MessageType type = 1;
|
oneof value{
|
||||||
bytes data = 2;
|
ResponseException exception = 1;
|
||||||
CodeType code = 3;
|
ResponseEcho echo = 2;
|
||||||
string error = 4;
|
ResponseFlush flush = 3;
|
||||||
string log = 5;
|
ResponseInfo info = 4;
|
||||||
repeated Validator validators = 6;
|
ResponseSetOption set_option = 5;
|
||||||
|
ResponseAppendTx append_tx = 6;
|
||||||
|
ResponseCheckTx check_tx = 7;
|
||||||
|
ResponseCommit commit = 8;
|
||||||
|
ResponseQuery query = 9;
|
||||||
|
ResponseInitChain init_chain = 10;
|
||||||
|
ResponseBeginBlock begin_block = 11;
|
||||||
|
ResponseEndBlock end_block = 12;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResponseException{
|
||||||
|
string error = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResponseEcho {
|
||||||
|
string message = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResponseFlush{
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResponseInfo {
|
||||||
|
string info = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResponseSetOption{
|
||||||
|
string log = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResponseAppendTx{
|
||||||
|
CodeType code = 1;
|
||||||
|
bytes data = 2;
|
||||||
|
string log = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResponseCheckTx{
|
||||||
|
CodeType code = 1;
|
||||||
|
bytes data = 2;
|
||||||
|
string log = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResponseQuery{
|
||||||
|
CodeType code = 1;
|
||||||
|
bytes data = 2;
|
||||||
|
string log = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResponseCommit{
|
||||||
|
CodeType code = 1;
|
||||||
|
bytes data = 2;
|
||||||
|
string log = 3;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
message ResponseInitChain{
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResponseBeginBlock{
|
||||||
|
}
|
||||||
|
|
||||||
|
message ResponseEndBlock{
|
||||||
|
repeated Validator diffs = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
//----------------------------------------
|
//----------------------------------------
|
||||||
@ -97,3 +213,20 @@ message Validator {
|
|||||||
bytes pubKey = 1;
|
bytes pubKey = 1;
|
||||||
uint64 power = 2;
|
uint64 power = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//----------------------------------------
|
||||||
|
// Service Definition
|
||||||
|
|
||||||
|
service TMSPApplication {
|
||||||
|
rpc Echo(RequestEcho) returns (ResponseEcho) ;
|
||||||
|
rpc Flush(RequestFlush) returns (ResponseFlush);
|
||||||
|
rpc Info(RequestInfo) returns (ResponseInfo);
|
||||||
|
rpc SetOption(RequestSetOption) returns (ResponseSetOption);
|
||||||
|
rpc AppendTx(RequestAppendTx) returns (ResponseAppendTx);
|
||||||
|
rpc CheckTx(RequestCheckTx) returns (ResponseCheckTx);
|
||||||
|
rpc Query(RequestQuery) returns (ResponseQuery);
|
||||||
|
rpc Commit(RequestCommit) returns (ResponseCommit);
|
||||||
|
rpc InitChain(RequestInitChain) returns (ResponseInitChain);
|
||||||
|
rpc BeginBlock(RequestBeginBlock) returns (ResponseBeginBlock);
|
||||||
|
rpc EndBlock(RequestEndBlock) returns (ResponseEndBlock);
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user