diff --git a/client/client.go b/client/client.go index 3afb838f..291c4863 100644 --- a/client/client.go +++ b/client/client.go @@ -4,13 +4,15 @@ import ( "fmt" "sync" + . "github.com/tendermint/go-common" "github.com/tendermint/tmsp/types" ) type Client interface { + Service + SetResponseCallback(Callback) Error() error - Stop() bool FlushAsync() *ReqRes EchoAsync(msg string) *ReqRes diff --git a/client/grpc_client.go b/client/grpc_client.go index 745de2f8..3e44dcc1 100644 --- a/client/grpc_client.go +++ b/client/grpc_client.go @@ -1,6 +1,7 @@ package tmspcli import ( + "errors" "net" "sync" "time" @@ -43,6 +44,7 @@ func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { func (cli *grpcClient) OnStart() error { cli.QuitService.OnStart() RETRY_LOOP: + for { conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) if err != nil { @@ -54,14 +56,52 @@ RETRY_LOOP: continue RETRY_LOOP } } - cli.client = types.NewTMSPApplicationClient(conn) + + client := types.NewTMSPApplicationClient(conn) + + ENSURE_CONNECTED: + for { + _, err := client.Echo(context.Background(), &types.RequestEcho{"hello"}, grpc.FailFast(true)) + if err == nil { + break ENSURE_CONNECTED + } + time.Sleep(time.Second) + } + + cli.client = client return nil } } func (cli *grpcClient) OnStop() { cli.QuitService.OnStop() - // TODO: how to close when TMSPApplicationClient interface doesn't expose Close ? + cli.mtx.Lock() + defer cli.mtx.Unlock() + // TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close() + /*if cli.conn != nil { + cli.conn.Close() + }*/ +} + +func (cli *grpcClient) StopForError(err error) { + cli.mtx.Lock() + if !cli.IsRunning() { + return + } + + if cli.err == nil { + cli.err = err + } + cli.mtx.Unlock() + + log.Warn(Fmt("Stopping tmsp.grpcClient for error: %v", err.Error())) + cli.Stop() +} + +func (cli *grpcClient) Error() error { + cli.mtx.Lock() + defer cli.mtx.Unlock() + return cli.err } // Set listener for all responses @@ -72,22 +112,6 @@ func (cli *grpcClient) SetResponseCallback(resCb Callback) { cli.resCb = resCb } -func (cli *grpcClient) StopForError(err error) { - cli.mtx.Lock() - log.Warn(Fmt("Stopping tmsp.grpcClient for error: %v\n", err.Error())) - if cli.err == nil { - cli.err = err - } - cli.mtx.Unlock() - cli.Stop() -} - -func (cli *grpcClient) Error() error { - cli.mtx.Lock() - defer cli.mtx.Unlock() - return cli.err -} - //---------------------------------------- // GRPC calls are synchronous, but some callbacks expect to be called asynchronously // (eg. the mempool expects to be able to lock to remove bad txs from cache). @@ -98,7 +122,7 @@ func (cli *grpcClient) Error() error { func (cli *grpcClient) EchoAsync(msg string) *ReqRes { req := types.ToRequestEcho(msg) - res, err := cli.client.Echo(context.Background(), req.GetEcho()) + res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -107,7 +131,7 @@ func (cli *grpcClient) EchoAsync(msg string) *ReqRes { func (cli *grpcClient) FlushAsync() *ReqRes { req := types.ToRequestFlush() - res, err := cli.client.Flush(context.Background(), req.GetFlush()) + res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -116,7 +140,7 @@ func (cli *grpcClient) FlushAsync() *ReqRes { func (cli *grpcClient) InfoAsync() *ReqRes { req := types.ToRequestInfo() - res, err := cli.client.Info(context.Background(), req.GetInfo()) + res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -125,7 +149,7 @@ func (cli *grpcClient) InfoAsync() *ReqRes { func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes { req := types.ToRequestSetOption(key, value) - res, err := cli.client.SetOption(context.Background(), req.GetSetOption()) + res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -134,7 +158,7 @@ func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes { func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes { req := types.ToRequestAppendTx(tx) - res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx()) + res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -143,7 +167,7 @@ func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes { func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes { req := types.ToRequestCheckTx(tx) - res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx()) + res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -152,7 +176,7 @@ func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes { func (cli *grpcClient) QueryAsync(query []byte) *ReqRes { req := types.ToRequestQuery(query) - res, err := cli.client.Query(context.Background(), req.GetQuery()) + res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -161,7 +185,7 @@ func (cli *grpcClient) QueryAsync(query []byte) *ReqRes { func (cli *grpcClient) CommitAsync() *ReqRes { req := types.ToRequestCommit() - res, err := cli.client.Commit(context.Background(), req.GetCommit()) + res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -170,7 +194,7 @@ func (cli *grpcClient) CommitAsync() *ReqRes { func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes { req := types.ToRequestInitChain(validators) - res, err := cli.client.InitChain(context.Background(), req.GetInitChain()) + res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -179,7 +203,7 @@ func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes { func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes { req := types.ToRequestBeginBlock(height) - res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock()) + res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -188,7 +212,7 @@ func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes { func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes { req := types.ToRequestEndBlock(height) - res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock()) + res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.FailFast(true)) if err != nil { cli.err = err } @@ -216,11 +240,35 @@ func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) return reqres } +func (cli *grpcClient) checkErrGetResult() *types.Result { + if cli.err != nil { + errorLog := cli.err.Error() + cli.StopForError(cli.err) + result := types.ErrInternalError + result.SetLog(errorLog) + return &result + } + return nil +} + +func (cli *grpcClient) checkGetErr() error { + if cli.err != nil { + err := errors.New(cli.err.Error()) + cli.StopForError(cli.err) + return err + } + return nil +} + //---------------------------------------- func (cli *grpcClient) EchoSync(msg string) (res types.Result) { - r := cli.EchoAsync(msg).Response.GetEcho() - return types.NewResultOK([]byte(r.Message), LOG) + reqres := cli.EchoAsync(msg) + if res := cli.checkErrGetResult(); res != nil { + return *res + } + resp := reqres.Response.GetEcho() + return types.NewResultOK([]byte(resp.Message), LOG) } func (cli *grpcClient) FlushSync() error { @@ -228,14 +276,18 @@ func (cli *grpcClient) FlushSync() error { } func (cli *grpcClient) InfoSync() (res types.Result) { - r := cli.InfoAsync().Response.GetInfo() - return types.NewResultOK([]byte(r.Info), LOG) + reqres := cli.InfoAsync() + if res := cli.checkErrGetResult(); res != nil { + return *res + } + resp := reqres.Response.GetInfo() + return types.NewResultOK([]byte(resp.Info), LOG) } func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) { reqres := cli.SetOptionAsync(key, value) - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res != nil { + return *res } resp := reqres.Response.GetSetOption() return types.Result{Code: OK, Data: nil, Log: resp.Log} @@ -243,8 +295,8 @@ func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) { reqres := cli.AppendTxAsync(tx) - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res != nil { + return *res } resp := reqres.Response.GetAppendTx() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -252,8 +304,8 @@ func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) { func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) { reqres := cli.CheckTxAsync(tx) - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res != nil { + return *res } resp := reqres.Response.GetCheckTx() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -261,8 +313,8 @@ func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) { func (cli *grpcClient) QuerySync(query []byte) (res types.Result) { reqres := cli.QueryAsync(query) - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res != nil { + return *res } resp := reqres.Response.GetQuery() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -270,8 +322,8 @@ func (cli *grpcClient) QuerySync(query []byte) (res types.Result) { func (cli *grpcClient) CommitSync() (res types.Result) { reqres := cli.CommitAsync() - if cli.err != nil { - return types.ErrInternalError.SetLog(cli.err.Error()) + if res := cli.checkErrGetResult(); res != nil { + return *res } resp := reqres.Response.GetCommit() return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} @@ -279,24 +331,24 @@ func (cli *grpcClient) CommitSync() (res types.Result) { func (cli *grpcClient) InitChainSync(validators []*types.Validator) (err error) { cli.InitChainAsync(validators) - if cli.err != nil { - return cli.err + if err := cli.checkGetErr(); err != nil { + return err } return nil } func (cli *grpcClient) BeginBlockSync(height uint64) (err error) { cli.BeginBlockAsync(height) - if cli.err != nil { - return cli.err + if err := cli.checkGetErr(); err != nil { + return err } return nil } func (cli *grpcClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { reqres := cli.EndBlockAsync(height) - if cli.err != nil { - return nil, cli.err + if err := cli.checkGetErr(); err != nil { + return nil, err } return reqres.Response.GetEndBlock().Diffs, nil } diff --git a/client/local_client.go b/client/local_client.go index 06db859b..eac5661b 100644 --- a/client/local_client.go +++ b/client/local_client.go @@ -1,11 +1,14 @@ package tmspcli import ( - types "github.com/tendermint/tmsp/types" "sync" + + . "github.com/tendermint/go-common" + types "github.com/tendermint/tmsp/types" ) type localClient struct { + *BaseService mtx *sync.Mutex types.Application Callback @@ -15,10 +18,12 @@ func NewLocalClient(mtx *sync.Mutex, app types.Application) *localClient { if mtx == nil { mtx = new(sync.Mutex) } - return &localClient{ + cli := &localClient{ mtx: mtx, Application: app, } + cli.BaseService = NewBaseService(log, "localClient", cli) + return cli } func (app *localClient) SetResponseCallback(cb Callback) { @@ -32,10 +37,6 @@ func (app *localClient) Error() error { return nil } -func (app *localClient) Stop() bool { - return true -} - func (app *localClient) FlushAsync() *ReqRes { // Do nothing return newLocalReqRes(types.ToRequestFlush(), nil) diff --git a/client/socket_client.go b/client/socket_client.go index ea1e7d6e..10f780bc 100644 --- a/client/socket_client.go +++ b/client/socket_client.go @@ -28,7 +28,6 @@ const flushThrottleMS = 20 // Don't wait longer than... // with concurrent callers. type socketClient struct { QuitService - sync.Mutex // [EB]: is this even used? reqQueue chan *ReqRes flushTimer *ThrottleTimer @@ -40,6 +39,7 @@ type socketClient struct { err error reqSent *list.List resCb func(*types.Request, *types.Response) // listens to all callbacks + } func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) { @@ -53,49 +53,70 @@ func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) { resCb: nil, } cli.QuitService = *NewQuitService(nil, "socketClient", cli) + _, err := cli.Start() // Just start it, it's confusing for callers to remember to start. return cli, err } func (cli *socketClient) OnStart() error { cli.QuitService.OnStart() + + var err error + var conn net.Conn RETRY_LOOP: for { - conn, err := Connect(cli.addr) + conn, err = Connect(cli.addr) if err != nil { if cli.mustConnect { return err } else { - log.Warn(Fmt("tmsp.socketClient failed to connect to %v. Retrying...\n", cli.addr)) + log.Warn(Fmt("tmsp.socketClient failed to connect to %v. Retrying...", cli.addr)) time.Sleep(time.Second * 3) continue RETRY_LOOP } } + cli.conn = conn + go cli.sendRequestsRoutine(conn) go cli.recvResponseRoutine(conn) - return err + + return nil } return nil // never happens } func (cli *socketClient) OnStop() { cli.QuitService.OnStop() + + cli.mtx.Lock() + defer cli.mtx.Unlock() if cli.conn != nil { cli.conn.Close() } + cli.flushQueue() } -func (cli *socketClient) flushQueue() { -LOOP: - for { - select { - case reqres := <-cli.reqQueue: - reqres.Done() - default: - break LOOP - } +// Stop the client and set the error +func (cli *socketClient) StopForError(err error) { + cli.mtx.Lock() + if !cli.IsRunning() { + return } + + if cli.err == nil { + cli.err = err + } + cli.mtx.Unlock() + + log.Warn(Fmt("Stopping tmsp.socketClient for error: %v", err.Error())) + cli.Stop() +} + +func (cli *socketClient) Error() error { + cli.mtx.Lock() + defer cli.mtx.Unlock() + return cli.err } // Set listener for all responses @@ -106,29 +127,10 @@ func (cli *socketClient) SetResponseCallback(resCb Callback) { cli.resCb = resCb } -func (cli *socketClient) StopForError(err error) { - if !cli.IsRunning() { - return - } - - cli.mtx.Lock() - log.Warn(Fmt("Stopping tmsp.socketClient for error: %v\n", err.Error())) - if cli.err == nil { - cli.err = err - } - cli.mtx.Unlock() - cli.Stop() -} - -func (cli *socketClient) Error() error { - cli.mtx.Lock() - defer cli.mtx.Unlock() - return cli.err -} - //---------------------------------------- func (cli *socketClient) sendRequestsRoutine(conn net.Conn) { + w := bufio.NewWriter(conn) for { select { @@ -144,14 +146,14 @@ func (cli *socketClient) sendRequestsRoutine(conn net.Conn) { cli.willSendReq(reqres) err := types.WriteMessage(reqres.Request, w) if err != nil { - cli.StopForError(err) + cli.StopForError(fmt.Errorf("Error writing msg: %v", err)) return } // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) if _, ok := reqres.Request.Value.(*types.Request_Flush); ok { err = w.Flush() if err != nil { - cli.StopForError(err) + cli.StopForError(fmt.Errorf("Error flushing writer: %v", err)) return } } @@ -160,6 +162,7 @@ func (cli *socketClient) sendRequestsRoutine(conn net.Conn) { } func (cli *socketClient) recvResponseRoutine(conn net.Conn) { + r := bufio.NewReader(conn) // Buffer reads for { var res = &types.Response{} @@ -172,11 +175,13 @@ func (cli *socketClient) recvResponseRoutine(conn net.Conn) { case *types.Response_Exception: // XXX After setting cli.err, release waiters (e.g. reqres.Done()) cli.StopForError(errors.New(r.Exception.Error)) + return default: // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) err := cli.didRecvResponse(res) if err != nil { cli.StopForError(err) + return } } } @@ -280,9 +285,8 @@ func (cli *socketClient) EchoSync(msg string) (res types.Result) { func (cli *socketClient) FlushSync() error { reqRes := cli.queueRequest(types.ToRequestFlush()) - if reqRes == nil { - return fmt.Errorf("Remote app is not running") - + if cli.err != nil { + return types.ErrInternalError.SetLog(cli.err.Error()) } reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here return cli.err @@ -378,10 +382,6 @@ func (cli *socketClient) EndBlockSync(height uint64) (validators []*types.Valida //---------------------------------------- func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { - if !cli.IsRunning() { - return nil - } - reqres := NewReqRes(req) // TODO: set cli.err if reqQueue times out @@ -398,6 +398,18 @@ func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { return reqres } +func (cli *socketClient) flushQueue() { +LOOP: + for { + select { + case reqres := <-cli.reqQueue: + reqres.Done() + default: + break LOOP + } + } +} + //---------------------------------------- func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { diff --git a/example/dummy/dummy_test.go b/example/dummy/dummy_test.go deleted file mode 100644 index c5ac2d27..00000000 --- a/example/dummy/dummy_test.go +++ /dev/null @@ -1,93 +0,0 @@ -package dummy - -import ( - "reflect" - "testing" - "time" - - . "github.com/tendermint/go-common" - "github.com/tendermint/tmsp/server" - "github.com/tendermint/tmsp/types" -) - -func TestStream(t *testing.T) { - - numAppendTxs := 200000 - - // Start the listener - server, err := server.NewSocketServer("unix://test.sock", NewDummyApplication()) - if err != nil { - Exit(err.Error()) - } - defer server.Stop() - - // Connect to the socket - conn, err := Connect("unix://test.sock") - if err != nil { - Exit(err.Error()) - } - - // Read response data - done := make(chan struct{}) - go func() { - counter := 0 - for { - - var res = &types.Response{} - err := types.ReadMessage(conn, res) - if err != nil { - Exit(err.Error()) - } - - // Process response - switch r := res.Value.(type) { - case *types.Response_AppendTx: - counter += 1 - if r.AppendTx.Code != types.CodeType_OK { - t.Error("AppendTx failed with ret_code", r.AppendTx.Code) - } - if counter > numAppendTxs { - t.Fatal("Too many AppendTx responses") - } - t.Log("response", counter) - if counter == numAppendTxs { - go func() { - time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow - close(done) - }() - } - case *types.Response_Flush: - // ignore - default: - t.Error("Unexpected response type", reflect.TypeOf(res.Value)) - } - } - }() - - // Write requests - for counter := 0; counter < numAppendTxs; counter++ { - // Send request - var req = types.ToRequestAppendTx([]byte("test")) - err := types.WriteMessage(req, conn) - if err != nil { - t.Fatal(err.Error()) - } - - // Sometimes send flush messages - if counter%123 == 0 { - t.Log("flush") - err := types.WriteMessage(types.ToRequestFlush(), conn) - if err != nil { - t.Fatal(err.Error()) - } - } - } - - // Send final flush message - err = types.WriteMessage(types.ToRequestFlush(), conn) - if err != nil { - t.Fatal(err.Error()) - } - - <-done -} diff --git a/example/example_test.go b/example/example_test.go new file mode 100644 index 00000000..fc9066d8 --- /dev/null +++ b/example/example_test.go @@ -0,0 +1,151 @@ +package nilapp + +import ( + "fmt" + "net" + "reflect" + "testing" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + + . "github.com/tendermint/go-common" + "github.com/tendermint/tmsp/client" + "github.com/tendermint/tmsp/example/dummy" + nilapp "github.com/tendermint/tmsp/example/nil" + "github.com/tendermint/tmsp/server" + "github.com/tendermint/tmsp/types" +) + +func TestDummy(t *testing.T) { + fmt.Println("### Testing Dummy") + testStream(t, dummy.NewDummyApplication()) +} + +func TestNilApp(t *testing.T) { + fmt.Println("### Testing NilApp") + testStream(t, nilapp.NewNilApplication()) +} + +func TestGRPC(t *testing.T) { + fmt.Println("### Testing GRPC") + testGRPCSync(t, types.NewGRPCApplication(nilapp.NewNilApplication())) +} + +func testStream(t *testing.T, app types.Application) { + + numAppendTxs := 200000 + + // Start the listener + server, err := server.NewSocketServer("unix://test.sock", app) + if err != nil { + Exit(Fmt("Error starting socket server: %v", err.Error())) + } + defer server.Stop() + + // Connect to the socket + client, err := tmspcli.NewSocketClient("unix://test.sock", false) + if err != nil { + Exit(Fmt("Error starting socket client: %v", err.Error())) + } + client.Start() + defer client.Stop() + + done := make(chan struct{}) + counter := 0 + client.SetResponseCallback(func(req *types.Request, res *types.Response) { + // Process response + switch r := res.Value.(type) { + case *types.Response_AppendTx: + counter += 1 + if r.AppendTx.Code != types.CodeType_OK { + t.Error("AppendTx failed with ret_code", r.AppendTx.Code) + } + if counter > numAppendTxs { + t.Fatalf("Too many AppendTx responses. Got %d, expected %d", counter, numAppendTxs) + } + if counter == numAppendTxs { + go func() { + time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow + close(done) + }() + return + } + case *types.Response_Flush: + // ignore + default: + t.Error("Unexpected response type", reflect.TypeOf(res.Value)) + } + }) + + // Write requests + for counter := 0; counter < numAppendTxs; counter++ { + // Send request + reqRes := client.AppendTxAsync([]byte("test")) + _ = reqRes + // check err ? + + // Sometimes send flush messages + if counter%123 == 0 { + client.FlushAsync() + // check err ? + } + } + + // Send final flush message + client.FlushAsync() + + <-done +} + +//------------------------- +// test grpc + +func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { + return Connect(addr) +} + +func testGRPCSync(t *testing.T, app *types.GRPCApplication) { + + numAppendTxs := 2000 + + // Start the listener + server, err := server.NewGRPCServer("unix://test.sock", app) + if err != nil { + Exit(Fmt("Error starting GRPC server: %v", err.Error())) + } + defer server.Stop() + + // Connect to the socket + conn, err := grpc.Dial("unix://test.sock", grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) + if err != nil { + Exit(Fmt("Error dialing GRPC server: %v", err.Error())) + } + defer conn.Close() + + client := types.NewTMSPApplicationClient(conn) + + // Write requests + for counter := 0; counter < numAppendTxs; counter++ { + // Send request + response, err := client.AppendTx(context.Background(), &types.RequestAppendTx{[]byte("test")}) + if err != nil { + t.Fatalf("Error in GRPC AppendTx: %v", err.Error()) + } + counter += 1 + if response.Code != types.CodeType_OK { + t.Error("AppendTx failed with ret_code", response.Code) + } + if counter > numAppendTxs { + t.Fatal("Too many AppendTx responses") + } + t.Log("response", counter) + if counter == numAppendTxs { + go func() { + time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow + }() + } + + } +} diff --git a/example/nil/nil_test.go b/example/nil/nil_test.go deleted file mode 100644 index ed7a36dd..00000000 --- a/example/nil/nil_test.go +++ /dev/null @@ -1,148 +0,0 @@ -package nilapp - -import ( - "net" - "reflect" - "testing" - "time" - - "golang.org/x/net/context" - "google.golang.org/grpc" - - . "github.com/tendermint/go-common" - "github.com/tendermint/tmsp/server" - "github.com/tendermint/tmsp/types" -) - -func TestStream(t *testing.T) { - - numAppendTxs := 200000 - - // Start the listener - server, err := server.NewSocketServer("unix://test.sock", NewNilApplication()) - if err != nil { - Exit(err.Error()) - } - defer server.Stop() - - // Connect to the socket - conn, err := Connect("unix://test.sock") - if err != nil { - Exit(err.Error()) - } - - // Read response data - done := make(chan struct{}) - go func() { - counter := 0 - for { - - var res = &types.Response{} - err := types.ReadMessage(conn, res) - if err != nil { - Exit(err.Error()) - } - - // Process response - switch r := res.Value.(type) { - case *types.Response_AppendTx: - counter += 1 - if r.AppendTx.Code != types.CodeType_OK { - t.Error("AppendTx failed with ret_code", r.AppendTx.Code) - } - if counter > numAppendTxs { - t.Fatal("Too many AppendTx responses") - } - t.Log("response", counter) - if counter == numAppendTxs { - go func() { - time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow - close(done) - }() - } - case *types.Response_Flush: - // ignore - default: - t.Error("Unexpected response type", reflect.TypeOf(res.Value)) - } - } - }() - - // Write requests - for counter := 0; counter < numAppendTxs; counter++ { - // Send request - var req = types.ToRequestAppendTx([]byte("test")) - err := types.WriteMessage(req, conn) - if err != nil { - t.Fatal(err.Error()) - } - - // Sometimes send flush messages - if counter%123 == 0 { - t.Log("flush") - err := types.WriteMessage(types.ToRequestFlush(), conn) - if err != nil { - t.Fatal(err.Error()) - } - } - } - - // Send final flush message - err = types.WriteMessage(types.ToRequestFlush(), conn) - if err != nil { - t.Fatal(err.Error()) - } - - <-done -} - -//------------------------- -// test grpc - -func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) { - return Connect(addr) -} - -func TestGRPCSync(t *testing.T) { - - numAppendTxs := 2000 - - // Start the listener - server, err := server.NewGRPCServer("unix://test.sock", types.NewGRPCApplication(NewNilApplication())) - if err != nil { - Exit(err.Error()) - } - defer server.Stop() - - // Connect to the socket - conn, err := grpc.Dial("unix://test.sock", grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) - if err != nil { - Exit(err.Error()) - } - defer conn.Close() - - client := types.NewTMSPApplicationClient(conn) - - // Write requests - for counter := 0; counter < numAppendTxs; counter++ { - // Send request - response, err := client.AppendTx(context.Background(), &types.RequestAppendTx{[]byte("test")}) - if err != nil { - t.Fatal(err.Error()) - } - counter += 1 - if response.Code != types.CodeType_OK { - t.Error("AppendTx failed with ret_code", response.Code) - } - if counter > numAppendTxs { - t.Fatal("Too many AppendTx responses") - } - t.Log("response", counter) - if counter == numAppendTxs { - go func() { - time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow - }() - } - - } -} diff --git a/server/grpc_server.go b/server/grpc_server.go index 3a21e3fb..02f6fa3f 100644 --- a/server/grpc_server.go +++ b/server/grpc_server.go @@ -18,6 +18,7 @@ type GRPCServer struct { proto string addr string listener net.Listener + server *grpc.Server app types.TMSPApplicationServer } @@ -43,13 +44,13 @@ func (s *GRPCServer) OnStart() error { return err } s.listener = ln - grpcServer := grpc.NewServer() - types.RegisterTMSPApplicationServer(grpcServer, s.app) - go grpcServer.Serve(ln) + s.server = grpc.NewServer() + types.RegisterTMSPApplicationServer(s.server, s.app) + go s.server.Serve(s.listener) return nil } func (s *GRPCServer) OnStop() { s.QuitService.OnStop() - s.listener.Close() + s.server.Stop() } diff --git a/server/log.go b/server/log.go new file mode 100644 index 00000000..8ac4092e --- /dev/null +++ b/server/log.go @@ -0,0 +1,7 @@ +package server + +import ( + "github.com/tendermint/go-logger" +) + +var log = logger.New("module", "tmsp-server") diff --git a/server/socket_server.go b/server/socket_server.go index 7969974e..8e837d8f 100644 --- a/server/socket_server.go +++ b/server/socket_server.go @@ -21,6 +21,10 @@ type SocketServer struct { addr string listener net.Listener + connsMtx sync.Mutex + conns map[int]net.Conn + nextConnID int + appMtx sync.Mutex app types.Application } @@ -33,6 +37,7 @@ func NewSocketServer(protoAddr string, app types.Application) (Service, error) { addr: addr, listener: nil, app: app, + conns: make(map[int]net.Conn), } s.QuitService = *NewQuitService(nil, "TMSPServer", s) _, err := s.Start() // Just start it @@ -53,6 +58,33 @@ func (s *SocketServer) OnStart() error { func (s *SocketServer) OnStop() { s.QuitService.OnStop() s.listener.Close() + + s.connsMtx.Lock() + for id, conn := range s.conns { + delete(s.conns, id) + conn.Close() + } + s.connsMtx.Unlock() +} + +func (s *SocketServer) addConn(conn net.Conn) int { + s.connsMtx.Lock() + defer s.connsMtx.Unlock() + + connID := s.nextConnID + s.nextConnID += 1 + s.conns[connID] = conn + + return connID +} + +// deletes conn even if close errs +func (s *SocketServer) rmConn(connID int, conn net.Conn) error { + s.connsMtx.Lock() + defer s.connsMtx.Unlock() + + delete(s.conns, connID) + return conn.Close() } func (s *SocketServer) acceptConnectionsRoutine() { @@ -62,7 +94,7 @@ func (s *SocketServer) acceptConnectionsRoutine() { // semaphore <- struct{}{} // Accept a connection - fmt.Println("Waiting for new connection...") + log.Notice("Waiting for new connection...") conn, err := s.listener.Accept() if err != nil { if !s.IsRunning() { @@ -70,9 +102,11 @@ func (s *SocketServer) acceptConnectionsRoutine() { } Exit("Failed to accept connection: " + err.Error()) } else { - fmt.Println("Accepted a new connection") + log.Notice("Accepted a new connection") } + connID := s.addConn(conn) + closeConn := make(chan error, 2) // Push to signal connection closed responses := make(chan *types.Response, 1000) // A channel to buffer responses @@ -84,16 +118,20 @@ func (s *SocketServer) acceptConnectionsRoutine() { go func() { // Wait until signal to close connection errClose := <-closeConn - if errClose != nil { - fmt.Printf("Connection error: %v\n", errClose) + if err == io.EOF { + log.Warn("Connection was closed by client") + return // is this correct? the conn is closed? + } else if errClose != nil { + log.Warn("Connection error", "error", errClose) } else { - fmt.Println("Connection was closed.") + // never happens + log.Warn("Connection was closed.") } // Close the connection - err := conn.Close() + err := s.rmConn(connID, conn) if err != nil { - fmt.Printf("Error in closing connection: %v\n", err) + log.Warn("Error in closing connection", "error", err) } // <-semaphore @@ -111,9 +149,9 @@ func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, respo err := types.ReadMessage(bufReader, req) if err != nil { if err == io.EOF { - closeConn <- fmt.Errorf("Connection closed by client") + closeConn <- err } else { - closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) + closeConn <- fmt.Errorf("Error reading message: %v", err.Error()) } return } @@ -176,13 +214,13 @@ func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *t var res = <-responses err := types.WriteMessage(res, bufWriter) if err != nil { - closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) + closeConn <- fmt.Errorf("Error writing message: %v", err.Error()) return } if _, ok := res.Value.(*types.Response_Flush); ok { err = bufWriter.Flush() if err != nil { - closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) + closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error()) return } } diff --git a/tests/test_cli/test.sh b/tests/test_cli/test.sh index f8920958..adb3e4bd 100644 --- a/tests/test_cli/test.sh +++ b/tests/test_cli/test.sh @@ -9,7 +9,7 @@ function testExample() { $APP &> /dev/null & sleep 2 tmsp-cli --verbose batch < $INPUT > "${INPUT}.out.new" - killall "$APP" > /dev/null + killall "$APP" &> /dev/null pre=`shasum < "${INPUT}.out"` post=`shasum < "${INPUT}.out.new"`