mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-13 21:31:23 +00:00
silly grpc client
This commit is contained in:
@ -1,8 +1,10 @@
|
||||
package tmspcli
|
||||
|
||||
import (
|
||||
"github.com/tendermint/tmsp/types"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/tendermint/tmsp/types"
|
||||
)
|
||||
|
||||
type Client interface {
|
||||
@ -39,6 +41,21 @@ type Client interface {
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
func NewClient(addr, transport string, mustConnect bool) (client Client, err error) {
|
||||
switch transport {
|
||||
case "socket":
|
||||
client, err = NewSocketClient(addr, mustConnect)
|
||||
case "grpc":
|
||||
client, err = NewGRPCClient(addr, mustConnect)
|
||||
default:
|
||||
err = fmt.Errorf("Unknown tmsp transport %s", transport)
|
||||
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
type Callback func(*types.Request, *types.Response)
|
||||
|
||||
//----------------------------------------
|
||||
|
298
client/grpc_client.go
Normal file
298
client/grpc_client.go
Normal file
@ -0,0 +1,298 @@
|
||||
package tmspcli
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
context "golang.org/x/net/context"
|
||||
grpc "google.golang.org/grpc"
|
||||
|
||||
. "github.com/tendermint/go-common"
|
||||
"github.com/tendermint/tmsp/types"
|
||||
)
|
||||
|
||||
// This is goroutine-safe, but users should beware that
|
||||
// the application in general is not meant to be interfaced
|
||||
// with concurrent callers.
|
||||
type grpcClient struct {
|
||||
QuitService
|
||||
mustConnect bool
|
||||
|
||||
client types.TMSPApplicationClient
|
||||
|
||||
mtx sync.Mutex
|
||||
addr string
|
||||
err error
|
||||
resCb func(*types.Request, *types.Response) // listens to all callbacks
|
||||
}
|
||||
|
||||
func NewGRPCClient(addr string, mustConnect bool) (*grpcClient, error) {
|
||||
cli := &grpcClient{
|
||||
addr: addr,
|
||||
mustConnect: mustConnect,
|
||||
}
|
||||
cli.QuitService = *NewQuitService(nil, "grpcClient", cli)
|
||||
_, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
|
||||
return cli, err
|
||||
}
|
||||
|
||||
func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
|
||||
return Connect(addr)
|
||||
}
|
||||
|
||||
func (cli *grpcClient) OnStart() (err error) {
|
||||
cli.QuitService.OnStart()
|
||||
RETRY_LOOP:
|
||||
for {
|
||||
conn, err_ := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
|
||||
if err_ != nil {
|
||||
if cli.mustConnect {
|
||||
err = err_ // OnStart() will return this.
|
||||
return
|
||||
} else {
|
||||
fmt.Printf("tmsp.grpcClient failed to connect to %v. Retrying...\n", cli.addr)
|
||||
time.Sleep(time.Second * 3)
|
||||
continue RETRY_LOOP
|
||||
}
|
||||
}
|
||||
cli.client = types.NewTMSPApplicationClient(conn)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (cli *grpcClient) OnStop() {
|
||||
cli.QuitService.OnStop()
|
||||
// TODO: close client (?)
|
||||
}
|
||||
|
||||
// Set listener for all responses
|
||||
// NOTE: callback may get internally generated flush responses.
|
||||
func (cli *grpcClient) SetResponseCallback(resCb Callback) {
|
||||
cli.mtx.Lock()
|
||||
defer cli.mtx.Unlock()
|
||||
cli.resCb = resCb
|
||||
}
|
||||
|
||||
func (cli *grpcClient) StopForError(err error) {
|
||||
cli.mtx.Lock()
|
||||
fmt.Printf("Stopping tmsp.grpcClient for error: %v\n", err.Error())
|
||||
if cli.err == nil {
|
||||
cli.err = err
|
||||
}
|
||||
cli.mtx.Unlock()
|
||||
cli.Stop()
|
||||
}
|
||||
|
||||
func (cli *grpcClient) Error() error {
|
||||
cli.mtx.Lock()
|
||||
defer cli.mtx.Unlock()
|
||||
return cli.err
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
// async calls are really sync.
|
||||
// maybe one day, if people really want it, we use grpc streams,
|
||||
// but hopefully not :D
|
||||
|
||||
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
|
||||
reqres := NewReqRes(req)
|
||||
reqres.Response = res // Set response
|
||||
reqres.Done() // Release waiters
|
||||
|
||||
// Notify reqRes listener if set
|
||||
if cb := reqres.GetCallback(); cb != nil {
|
||||
cb(res)
|
||||
}
|
||||
|
||||
// Notify client listener if set
|
||||
if cli.resCb != nil {
|
||||
cli.resCb(reqres.Request, res)
|
||||
}
|
||||
return reqres
|
||||
}
|
||||
|
||||
func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
|
||||
req := types.ToRequestEcho(msg)
|
||||
res, err := cli.client.Echo(context.Background(), req.GetEcho())
|
||||
if err != nil {
|
||||
cli.err = err
|
||||
}
|
||||
return cli.finishAsyncCall(req, &types.Response{&types.Response_Echo{res}})
|
||||
}
|
||||
|
||||
func (cli *grpcClient) FlushAsync() *ReqRes {
|
||||
req := types.ToRequestFlush()
|
||||
res, err := cli.client.Flush(context.Background(), req.GetFlush())
|
||||
if err != nil {
|
||||
cli.err = err
|
||||
}
|
||||
return cli.finishAsyncCall(req, &types.Response{&types.Response_Flush{res}})
|
||||
}
|
||||
|
||||
func (cli *grpcClient) InfoAsync() *ReqRes {
|
||||
req := types.ToRequestInfo()
|
||||
res, err := cli.client.Info(context.Background(), req.GetInfo())
|
||||
if err != nil {
|
||||
cli.err = err
|
||||
}
|
||||
return cli.finishAsyncCall(req, &types.Response{&types.Response_Info{res}})
|
||||
}
|
||||
|
||||
func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes {
|
||||
req := types.ToRequestSetOption(key, value)
|
||||
res, err := cli.client.SetOption(context.Background(), req.GetSetOption())
|
||||
if err != nil {
|
||||
cli.err = err
|
||||
}
|
||||
return cli.finishAsyncCall(req, &types.Response{&types.Response_SetOption{res}})
|
||||
}
|
||||
|
||||
func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes {
|
||||
req := types.ToRequestAppendTx(tx)
|
||||
res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx())
|
||||
if err != nil {
|
||||
cli.err = err
|
||||
}
|
||||
return cli.finishAsyncCall(req, &types.Response{&types.Response_AppendTx{res}})
|
||||
}
|
||||
|
||||
func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
|
||||
req := types.ToRequestCheckTx(tx)
|
||||
res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx())
|
||||
if err != nil {
|
||||
cli.err = err
|
||||
}
|
||||
return cli.finishAsyncCall(req, &types.Response{&types.Response_CheckTx{res}})
|
||||
}
|
||||
|
||||
func (cli *grpcClient) QueryAsync(query []byte) *ReqRes {
|
||||
req := types.ToRequestQuery(query)
|
||||
res, err := cli.client.Query(context.Background(), req.GetQuery())
|
||||
if err != nil {
|
||||
cli.err = err
|
||||
}
|
||||
return cli.finishAsyncCall(req, &types.Response{&types.Response_Query{res}})
|
||||
}
|
||||
|
||||
func (cli *grpcClient) CommitAsync() *ReqRes {
|
||||
req := types.ToRequestCommit()
|
||||
res, err := cli.client.Commit(context.Background(), req.GetCommit())
|
||||
if err != nil {
|
||||
cli.err = err
|
||||
}
|
||||
return cli.finishAsyncCall(req, &types.Response{&types.Response_Commit{res}})
|
||||
}
|
||||
|
||||
func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes {
|
||||
req := types.ToRequestInitChain(validators)
|
||||
res, err := cli.client.InitChain(context.Background(), req.GetInitChain())
|
||||
if err != nil {
|
||||
cli.err = err
|
||||
}
|
||||
return cli.finishAsyncCall(req, &types.Response{&types.Response_InitChain{res}})
|
||||
}
|
||||
|
||||
func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes {
|
||||
req := types.ToRequestBeginBlock(height)
|
||||
res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock())
|
||||
if err != nil {
|
||||
cli.err = err
|
||||
}
|
||||
return cli.finishAsyncCall(req, &types.Response{&types.Response_BeginBlock{res}})
|
||||
}
|
||||
|
||||
func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes {
|
||||
req := types.ToRequestEndBlock(height)
|
||||
res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock())
|
||||
if err != nil {
|
||||
cli.err = err
|
||||
}
|
||||
return cli.finishAsyncCall(req, &types.Response{&types.Response_EndBlock{res}})
|
||||
}
|
||||
|
||||
//----------------------------------------
|
||||
|
||||
func (cli *grpcClient) EchoSync(msg string) (res types.Result) {
|
||||
r := cli.EchoAsync(msg).Response.GetEcho()
|
||||
return types.NewResultOK([]byte(r.Message), LOG)
|
||||
}
|
||||
|
||||
func (cli *grpcClient) FlushSync() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cli *grpcClient) InfoSync() (res types.Result) {
|
||||
r := cli.InfoAsync().Response.GetInfo()
|
||||
return types.NewResultOK([]byte(r.Info), LOG)
|
||||
}
|
||||
|
||||
func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) {
|
||||
reqres := cli.SetOptionAsync(key, value)
|
||||
if cli.err != nil {
|
||||
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||
}
|
||||
resp := reqres.Response.GetSetOption()
|
||||
return types.Result{Code: OK, Data: nil, Log: resp.Log}
|
||||
}
|
||||
|
||||
func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) {
|
||||
reqres := cli.AppendTxAsync(tx)
|
||||
if cli.err != nil {
|
||||
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||
}
|
||||
resp := reqres.Response.GetAppendTx()
|
||||
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
||||
}
|
||||
|
||||
func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) {
|
||||
reqres := cli.CheckTxAsync(tx)
|
||||
if cli.err != nil {
|
||||
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||
}
|
||||
resp := reqres.Response.GetCheckTx()
|
||||
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
||||
}
|
||||
|
||||
func (cli *grpcClient) QuerySync(query []byte) (res types.Result) {
|
||||
reqres := cli.QueryAsync(query)
|
||||
if cli.err != nil {
|
||||
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||
}
|
||||
resp := reqres.Response.GetQuery()
|
||||
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
||||
}
|
||||
|
||||
func (cli *grpcClient) CommitSync() (res types.Result) {
|
||||
reqres := cli.CommitAsync()
|
||||
if cli.err != nil {
|
||||
return types.ErrInternalError.SetLog(cli.err.Error())
|
||||
}
|
||||
resp := reqres.Response.GetCommit()
|
||||
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
|
||||
}
|
||||
|
||||
func (cli *grpcClient) InitChainSync(validators []*types.Validator) (err error) {
|
||||
cli.InitChainAsync(validators)
|
||||
if cli.err != nil {
|
||||
return cli.err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cli *grpcClient) BeginBlockSync(height uint64) (err error) {
|
||||
cli.BeginBlockAsync(height)
|
||||
if cli.err != nil {
|
||||
return cli.err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cli *grpcClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) {
|
||||
reqres := cli.EndBlockAsync(height)
|
||||
if cli.err != nil {
|
||||
return nil, cli.err
|
||||
}
|
||||
return reqres.Response.GetEndBlock().Diffs, nil
|
||||
}
|
@ -42,7 +42,7 @@ type remoteClient struct {
|
||||
resCb func(*types.Request, *types.Response) // listens to all callbacks
|
||||
}
|
||||
|
||||
func NewClient(addr string, mustConnect bool) (*remoteClient, error) {
|
||||
func NewSocketClient(addr string, mustConnect bool) (*remoteClient, error) {
|
||||
cli := &remoteClient{
|
||||
reqQueue: make(chan *ReqRes, reqQueueSize),
|
||||
flushTimer: NewThrottleTimer("remoteClient", flushThrottleMS),
|
||||
@ -129,7 +129,7 @@ func (cli *remoteClient) sendValueRoutine(conn net.Conn) {
|
||||
select {
|
||||
case <-cli.flushTimer.Ch:
|
||||
select {
|
||||
case cli.reqQueue <- NewReqRes(types.ToRequestFlush()):
|
||||
case cli.reqQueue <- NewReqRes(types.ToRequestFlush()): // cant this block ?
|
||||
default:
|
||||
// Probably will fill the buffer, or retry later.
|
||||
}
|
||||
@ -369,6 +369,7 @@ func (cli *remoteClient) EndBlockSync(height uint64) (validators []*types.Valida
|
||||
|
||||
func (cli *remoteClient) queueRequest(req *types.Request) *ReqRes {
|
||||
reqres := NewReqRes(req)
|
||||
|
||||
// TODO: set cli.err if reqQueue times out
|
||||
cli.reqQueue <- reqres
|
||||
|
||||
|
@ -6,31 +6,20 @@ import (
|
||||
. "github.com/tendermint/go-common"
|
||||
"github.com/tendermint/tmsp/example/counter"
|
||||
"github.com/tendermint/tmsp/server"
|
||||
"github.com/tendermint/tmsp/types"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
addrPtr := flag.String("addr", "tcp://0.0.0.0:46658", "Listen address")
|
||||
grpcPtr := flag.String("tmsp", "socket", "TMSP server: socket | grpc")
|
||||
tmspPtr := flag.String("tmsp", "socket", "TMSP server: socket | grpc")
|
||||
serialPtr := flag.Bool("serial", false, "Enforce incrementing (serial) txs")
|
||||
flag.Parse()
|
||||
app := counter.NewCounterApplication(*serialPtr)
|
||||
|
||||
// Start the listener
|
||||
switch *grpcPtr {
|
||||
case "socket":
|
||||
_, err := server.NewServer(*addrPtr, app)
|
||||
if err != nil {
|
||||
Exit(err.Error())
|
||||
}
|
||||
case "grpc":
|
||||
_, err := server.NewGRPCServer(*addrPtr, types.NewGRPCApplication(app))
|
||||
if err != nil {
|
||||
Exit(err.Error())
|
||||
}
|
||||
default:
|
||||
Exit(Fmt("Unknown server type %s", *grpcPtr))
|
||||
_, err := server.NewServer(*addrPtr, *tmspPtr, app)
|
||||
if err != nil {
|
||||
Exit(err.Error())
|
||||
}
|
||||
|
||||
// Wait forever
|
||||
|
@ -11,10 +11,11 @@ import (
|
||||
func main() {
|
||||
|
||||
addrPtr := flag.String("addr", "tcp://0.0.0.0:46658", "Listen address")
|
||||
tmspPtr := flag.String("tmsp", "socket", "socket | grpc")
|
||||
flag.Parse()
|
||||
|
||||
// Start the listener
|
||||
_, err := server.NewServer(*addrPtr, dummy.NewDummyApplication())
|
||||
_, err := server.NewServer(*addrPtr, *tmspPtr, dummy.NewDummyApplication())
|
||||
if err != nil {
|
||||
Exit(err.Error())
|
||||
}
|
||||
|
@ -29,6 +29,13 @@ func main() {
|
||||
Usage: "address of application socket",
|
||||
},
|
||||
}
|
||||
app.Flags = []cli.Flag{
|
||||
cli.StringFlag{
|
||||
Name: "tmsp",
|
||||
Value: "socket",
|
||||
Usage: "socket or grpc",
|
||||
},
|
||||
}
|
||||
app.Commands = []cli.Command{
|
||||
{
|
||||
Name: "batch",
|
||||
@ -105,7 +112,7 @@ func main() {
|
||||
func before(c *cli.Context) error {
|
||||
if client == nil {
|
||||
var err error
|
||||
client, err = tmspcli.NewClient(c.GlobalString("address"), false)
|
||||
client, err = tmspcli.NewClient(c.GlobalString("address"), c.GlobalString("tmsp"), false)
|
||||
if err != nil {
|
||||
Exit(err.Error())
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ func TestStream(t *testing.T) {
|
||||
numAppendTxs := 200000
|
||||
|
||||
// Start the listener
|
||||
server, err := server.NewServer("unix://test.sock", NewDummyApplication())
|
||||
server, err := server.NewSocketServer("unix://test.sock", NewDummyApplication())
|
||||
if err != nil {
|
||||
Exit(err.Error())
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ func TestStream(t *testing.T) {
|
||||
numAppendTxs := 200000
|
||||
|
||||
// Start the listener
|
||||
server, err := server.NewServer("unix://test.sock", NewNilApplication())
|
||||
server, err := server.NewSocketServer("unix://test.sock", NewNilApplication())
|
||||
if err != nil {
|
||||
Exit(err.Error())
|
||||
}
|
||||
|
189
server/server.go
189
server/server.go
@ -1,191 +1,22 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
. "github.com/tendermint/go-common"
|
||||
"github.com/tendermint/tmsp/types"
|
||||
)
|
||||
|
||||
// var maxNumberConnections = 2
|
||||
|
||||
type Server struct {
|
||||
QuitService
|
||||
|
||||
proto string
|
||||
addr string
|
||||
listener net.Listener
|
||||
|
||||
appMtx sync.Mutex
|
||||
app types.Application
|
||||
}
|
||||
|
||||
func NewServer(protoAddr string, app types.Application) (Service, error) {
|
||||
parts := strings.SplitN(protoAddr, "://", 2)
|
||||
proto, addr := parts[0], parts[1]
|
||||
s := &Server{
|
||||
proto: proto,
|
||||
addr: addr,
|
||||
listener: nil,
|
||||
app: app,
|
||||
func NewServer(protoAddr, transport string, app types.Application) (Service, error) {
|
||||
var s Service
|
||||
var err error
|
||||
switch transport {
|
||||
case "socket":
|
||||
s, err = NewSocketServer(protoAddr, app)
|
||||
case "grpc":
|
||||
s, err = NewGRPCServer(protoAddr, types.NewGRPCApplication(app))
|
||||
default:
|
||||
err = fmt.Errorf("Unknown server type %s", transport)
|
||||
}
|
||||
s.QuitService = *NewQuitService(nil, "TMSPServer", s)
|
||||
_, err := s.Start() // Just start it
|
||||
return s, err
|
||||
}
|
||||
|
||||
func (s *Server) OnStart() error {
|
||||
s.QuitService.OnStart()
|
||||
ln, err := net.Listen(s.proto, s.addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.listener = ln
|
||||
go s.acceptConnectionsRoutine()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) OnStop() {
|
||||
s.QuitService.OnStop()
|
||||
s.listener.Close()
|
||||
}
|
||||
|
||||
func (s *Server) acceptConnectionsRoutine() {
|
||||
// semaphore := make(chan struct{}, maxNumberConnections)
|
||||
|
||||
for {
|
||||
// semaphore <- struct{}{}
|
||||
|
||||
// Accept a connection
|
||||
fmt.Println("Waiting for new connection...")
|
||||
conn, err := s.listener.Accept()
|
||||
if err != nil {
|
||||
if !s.IsRunning() {
|
||||
return // Ignore error from listener closing.
|
||||
}
|
||||
Exit("Failed to accept connection: " + err.Error())
|
||||
} else {
|
||||
fmt.Println("Accepted a new connection")
|
||||
}
|
||||
|
||||
closeConn := make(chan error, 2) // Push to signal connection closed
|
||||
responses := make(chan *types.Response, 1000) // A channel to buffer responses
|
||||
|
||||
// Read requests from conn and deal with them
|
||||
go s.handleRequests(closeConn, conn, responses)
|
||||
// Pull responses from 'responses' and write them to conn.
|
||||
go s.handleResponses(closeConn, responses, conn)
|
||||
|
||||
go func() {
|
||||
// Wait until signal to close connection
|
||||
errClose := <-closeConn
|
||||
if errClose != nil {
|
||||
fmt.Printf("Connection error: %v\n", errClose)
|
||||
} else {
|
||||
fmt.Println("Connection was closed.")
|
||||
}
|
||||
|
||||
// Close the connection
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
fmt.Printf("Error in closing connection: %v\n", err)
|
||||
}
|
||||
|
||||
// <-semaphore
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Read requests from conn and deal with them
|
||||
func (s *Server) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
|
||||
var count int
|
||||
var bufReader = bufio.NewReader(conn)
|
||||
for {
|
||||
|
||||
var req = &types.Request{}
|
||||
err := types.ReadMessage(bufReader, req)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
closeConn <- fmt.Errorf("Connection closed by client")
|
||||
} else {
|
||||
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
s.appMtx.Lock()
|
||||
count++
|
||||
s.handleRequest(req, responses)
|
||||
s.appMtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) handleRequest(req *types.Request, responses chan<- *types.Response) {
|
||||
switch r := req.Value.(type) {
|
||||
case *types.Request_Echo:
|
||||
responses <- types.ToResponseEcho(r.Echo.Message)
|
||||
case *types.Request_Flush:
|
||||
responses <- types.ToResponseFlush()
|
||||
case *types.Request_Info:
|
||||
data := s.app.Info()
|
||||
responses <- types.ToResponseInfo(data)
|
||||
case *types.Request_SetOption:
|
||||
so := r.SetOption
|
||||
logStr := s.app.SetOption(so.Key, so.Value)
|
||||
responses <- types.ToResponseSetOption(logStr)
|
||||
case *types.Request_AppendTx:
|
||||
res := s.app.AppendTx(r.AppendTx.Tx)
|
||||
responses <- types.ToResponseAppendTx(res.Code, res.Data, res.Log)
|
||||
case *types.Request_CheckTx:
|
||||
res := s.app.CheckTx(r.CheckTx.Tx)
|
||||
responses <- types.ToResponseCheckTx(res.Code, res.Data, res.Log)
|
||||
case *types.Request_Commit:
|
||||
res := s.app.Commit()
|
||||
responses <- types.ToResponseCommit(res.Code, res.Data, res.Log)
|
||||
case *types.Request_Query:
|
||||
res := s.app.Query(r.Query.Query)
|
||||
responses <- types.ToResponseQuery(res.Code, res.Data, res.Log)
|
||||
case *types.Request_InitChain:
|
||||
if app, ok := s.app.(types.BlockchainAware); ok {
|
||||
app.InitChain(r.InitChain.Validators)
|
||||
responses <- types.ToResponseInitChain()
|
||||
} else {
|
||||
responses <- types.ToResponseInitChain()
|
||||
}
|
||||
case *types.Request_EndBlock:
|
||||
if app, ok := s.app.(types.BlockchainAware); ok {
|
||||
validators := app.EndBlock(r.EndBlock.Height)
|
||||
responses <- types.ToResponseEndBlock(validators)
|
||||
} else {
|
||||
responses <- types.ToResponseEndBlock(nil)
|
||||
}
|
||||
default:
|
||||
responses <- types.ToResponseException("Unknown request")
|
||||
}
|
||||
}
|
||||
|
||||
// Pull responses from 'responses' and write them to conn.
|
||||
func (s *Server) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
|
||||
var count int
|
||||
var bufWriter = bufio.NewWriter(conn)
|
||||
for {
|
||||
var res = <-responses
|
||||
err := types.WriteMessage(res, bufWriter)
|
||||
if err != nil {
|
||||
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
|
||||
return
|
||||
}
|
||||
if _, ok := res.Value.(*types.Response_Flush); ok {
|
||||
err = bufWriter.Flush()
|
||||
if err != nil {
|
||||
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
count++
|
||||
}
|
||||
}
|
||||
|
191
server/socket_server.go
Normal file
191
server/socket_server.go
Normal file
@ -0,0 +1,191 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
. "github.com/tendermint/go-common"
|
||||
"github.com/tendermint/tmsp/types"
|
||||
)
|
||||
|
||||
// var maxNumberConnections = 2
|
||||
|
||||
type SocketServer struct {
|
||||
QuitService
|
||||
|
||||
proto string
|
||||
addr string
|
||||
listener net.Listener
|
||||
|
||||
appMtx sync.Mutex
|
||||
app types.Application
|
||||
}
|
||||
|
||||
func NewSocketServer(protoAddr string, app types.Application) (Service, error) {
|
||||
parts := strings.SplitN(protoAddr, "://", 2)
|
||||
proto, addr := parts[0], parts[1]
|
||||
s := &SocketServer{
|
||||
proto: proto,
|
||||
addr: addr,
|
||||
listener: nil,
|
||||
app: app,
|
||||
}
|
||||
s.QuitService = *NewQuitService(nil, "TMSPServer", s)
|
||||
_, err := s.Start() // Just start it
|
||||
return s, err
|
||||
}
|
||||
|
||||
func (s *SocketServer) OnStart() error {
|
||||
s.QuitService.OnStart()
|
||||
ln, err := net.Listen(s.proto, s.addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.listener = ln
|
||||
go s.acceptConnectionsRoutine()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SocketServer) OnStop() {
|
||||
s.QuitService.OnStop()
|
||||
s.listener.Close()
|
||||
}
|
||||
|
||||
func (s *SocketServer) acceptConnectionsRoutine() {
|
||||
// semaphore := make(chan struct{}, maxNumberConnections)
|
||||
|
||||
for {
|
||||
// semaphore <- struct{}{}
|
||||
|
||||
// Accept a connection
|
||||
fmt.Println("Waiting for new connection...")
|
||||
conn, err := s.listener.Accept()
|
||||
if err != nil {
|
||||
if !s.IsRunning() {
|
||||
return // Ignore error from listener closing.
|
||||
}
|
||||
Exit("Failed to accept connection: " + err.Error())
|
||||
} else {
|
||||
fmt.Println("Accepted a new connection")
|
||||
}
|
||||
|
||||
closeConn := make(chan error, 2) // Push to signal connection closed
|
||||
responses := make(chan *types.Response, 1000) // A channel to buffer responses
|
||||
|
||||
// Read requests from conn and deal with them
|
||||
go s.handleRequests(closeConn, conn, responses)
|
||||
// Pull responses from 'responses' and write them to conn.
|
||||
go s.handleResponses(closeConn, responses, conn)
|
||||
|
||||
go func() {
|
||||
// Wait until signal to close connection
|
||||
errClose := <-closeConn
|
||||
if errClose != nil {
|
||||
fmt.Printf("Connection error: %v\n", errClose)
|
||||
} else {
|
||||
fmt.Println("Connection was closed.")
|
||||
}
|
||||
|
||||
// Close the connection
|
||||
err := conn.Close()
|
||||
if err != nil {
|
||||
fmt.Printf("Error in closing connection: %v\n", err)
|
||||
}
|
||||
|
||||
// <-semaphore
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// Read requests from conn and deal with them
|
||||
func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
|
||||
var count int
|
||||
var bufReader = bufio.NewReader(conn)
|
||||
for {
|
||||
|
||||
var req = &types.Request{}
|
||||
err := types.ReadMessage(bufReader, req)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
closeConn <- fmt.Errorf("Connection closed by client")
|
||||
} else {
|
||||
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
s.appMtx.Lock()
|
||||
count++
|
||||
s.handleRequest(req, responses)
|
||||
s.appMtx.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types.Response) {
|
||||
switch r := req.Value.(type) {
|
||||
case *types.Request_Echo:
|
||||
responses <- types.ToResponseEcho(r.Echo.Message)
|
||||
case *types.Request_Flush:
|
||||
responses <- types.ToResponseFlush()
|
||||
case *types.Request_Info:
|
||||
data := s.app.Info()
|
||||
responses <- types.ToResponseInfo(data)
|
||||
case *types.Request_SetOption:
|
||||
so := r.SetOption
|
||||
logStr := s.app.SetOption(so.Key, so.Value)
|
||||
responses <- types.ToResponseSetOption(logStr)
|
||||
case *types.Request_AppendTx:
|
||||
res := s.app.AppendTx(r.AppendTx.Tx)
|
||||
responses <- types.ToResponseAppendTx(res.Code, res.Data, res.Log)
|
||||
case *types.Request_CheckTx:
|
||||
res := s.app.CheckTx(r.CheckTx.Tx)
|
||||
responses <- types.ToResponseCheckTx(res.Code, res.Data, res.Log)
|
||||
case *types.Request_Commit:
|
||||
res := s.app.Commit()
|
||||
responses <- types.ToResponseCommit(res.Code, res.Data, res.Log)
|
||||
case *types.Request_Query:
|
||||
res := s.app.Query(r.Query.Query)
|
||||
responses <- types.ToResponseQuery(res.Code, res.Data, res.Log)
|
||||
case *types.Request_InitChain:
|
||||
if app, ok := s.app.(types.BlockchainAware); ok {
|
||||
app.InitChain(r.InitChain.Validators)
|
||||
responses <- types.ToResponseInitChain()
|
||||
} else {
|
||||
responses <- types.ToResponseInitChain()
|
||||
}
|
||||
case *types.Request_EndBlock:
|
||||
if app, ok := s.app.(types.BlockchainAware); ok {
|
||||
validators := app.EndBlock(r.EndBlock.Height)
|
||||
responses <- types.ToResponseEndBlock(validators)
|
||||
} else {
|
||||
responses <- types.ToResponseEndBlock(nil)
|
||||
}
|
||||
default:
|
||||
responses <- types.ToResponseException("Unknown request")
|
||||
}
|
||||
}
|
||||
|
||||
// Pull responses from 'responses' and write them to conn.
|
||||
func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
|
||||
var count int
|
||||
var bufWriter = bufio.NewWriter(conn)
|
||||
for {
|
||||
var res = <-responses
|
||||
err := types.WriteMessage(res, bufWriter)
|
||||
if err != nil {
|
||||
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
|
||||
return
|
||||
}
|
||||
if _, ok := res.Value.(*types.Response_Flush); ok {
|
||||
err = bufWriter.Flush()
|
||||
if err != nil {
|
||||
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
count++
|
||||
}
|
||||
}
|
@ -5,5 +5,8 @@ cd $ROOT
|
||||
# test golang counter
|
||||
COUNTER_APP="counter" go run $ROOT/tests/test_counter.go
|
||||
|
||||
# test golang counter via grpc
|
||||
COUNTER_APP="counter -tmsp=grpc" go run $ROOT/tests/test_counter.go -tmsp=grpc
|
||||
|
||||
# test nodejs counter
|
||||
COUNTER_APP="node ../js-tmsp/example/app.js" go run $ROOT/tests/test_counter.go
|
||||
|
@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
@ -12,7 +13,10 @@ import (
|
||||
"github.com/tendermint/tmsp/types"
|
||||
)
|
||||
|
||||
var tmspPtr = flag.String("tmsp", "socket", "socket or grpc")
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
|
||||
// Run tests
|
||||
testBasic()
|
||||
@ -70,7 +74,7 @@ func startApp() *process.Process {
|
||||
|
||||
func startClient() tmspcli.Client {
|
||||
// Start client
|
||||
client, err := tmspcli.NewClient("tcp://127.0.0.1:46658", true)
|
||||
client, err := tmspcli.NewClient("tcp://127.0.0.1:46658", *tmspPtr, true)
|
||||
if err != nil {
|
||||
panic("connecting to counter_app: " + err.Error())
|
||||
}
|
||||
|
Reference in New Issue
Block a user