consolidate example tests; grpc fail fast

This commit is contained in:
Ethan Buchman
2016-08-10 18:29:46 -04:00
parent a8066f9c82
commit 1b13f14e08
11 changed files with 378 additions and 355 deletions

View File

@ -4,13 +4,15 @@ import (
"fmt" "fmt"
"sync" "sync"
. "github.com/tendermint/go-common"
"github.com/tendermint/tmsp/types" "github.com/tendermint/tmsp/types"
) )
type Client interface { type Client interface {
Service
SetResponseCallback(Callback) SetResponseCallback(Callback)
Error() error Error() error
Stop() bool
FlushAsync() *ReqRes FlushAsync() *ReqRes
EchoAsync(msg string) *ReqRes EchoAsync(msg string) *ReqRes

View File

@ -1,6 +1,7 @@
package tmspcli package tmspcli
import ( import (
"errors"
"net" "net"
"sync" "sync"
"time" "time"
@ -43,6 +44,7 @@ func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
func (cli *grpcClient) OnStart() error { func (cli *grpcClient) OnStart() error {
cli.QuitService.OnStart() cli.QuitService.OnStart()
RETRY_LOOP: RETRY_LOOP:
for { for {
conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc)) conn, err := grpc.Dial(cli.addr, grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
if err != nil { if err != nil {
@ -54,14 +56,52 @@ RETRY_LOOP:
continue RETRY_LOOP continue RETRY_LOOP
} }
} }
cli.client = types.NewTMSPApplicationClient(conn)
client := types.NewTMSPApplicationClient(conn)
ENSURE_CONNECTED:
for {
_, err := client.Echo(context.Background(), &types.RequestEcho{"hello"}, grpc.FailFast(true))
if err == nil {
break ENSURE_CONNECTED
}
time.Sleep(time.Second)
}
cli.client = client
return nil return nil
} }
} }
func (cli *grpcClient) OnStop() { func (cli *grpcClient) OnStop() {
cli.QuitService.OnStop() cli.QuitService.OnStop()
// TODO: how to close when TMSPApplicationClient interface doesn't expose Close ? cli.mtx.Lock()
defer cli.mtx.Unlock()
// TODO: how to close conn? its not a net.Conn and grpc doesn't expose a Close()
/*if cli.conn != nil {
cli.conn.Close()
}*/
}
func (cli *grpcClient) StopForError(err error) {
cli.mtx.Lock()
if !cli.IsRunning() {
return
}
if cli.err == nil {
cli.err = err
}
cli.mtx.Unlock()
log.Warn(Fmt("Stopping tmsp.grpcClient for error: %v", err.Error()))
cli.Stop()
}
func (cli *grpcClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
} }
// Set listener for all responses // Set listener for all responses
@ -72,22 +112,6 @@ func (cli *grpcClient) SetResponseCallback(resCb Callback) {
cli.resCb = resCb cli.resCb = resCb
} }
func (cli *grpcClient) StopForError(err error) {
cli.mtx.Lock()
log.Warn(Fmt("Stopping tmsp.grpcClient for error: %v\n", err.Error()))
if cli.err == nil {
cli.err = err
}
cli.mtx.Unlock()
cli.Stop()
}
func (cli *grpcClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}
//---------------------------------------- //----------------------------------------
// GRPC calls are synchronous, but some callbacks expect to be called asynchronously // GRPC calls are synchronous, but some callbacks expect to be called asynchronously
// (eg. the mempool expects to be able to lock to remove bad txs from cache). // (eg. the mempool expects to be able to lock to remove bad txs from cache).
@ -98,7 +122,7 @@ func (cli *grpcClient) Error() error {
func (cli *grpcClient) EchoAsync(msg string) *ReqRes { func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
req := types.ToRequestEcho(msg) req := types.ToRequestEcho(msg)
res, err := cli.client.Echo(context.Background(), req.GetEcho()) res, err := cli.client.Echo(context.Background(), req.GetEcho(), grpc.FailFast(true))
if err != nil { if err != nil {
cli.err = err cli.err = err
} }
@ -107,7 +131,7 @@ func (cli *grpcClient) EchoAsync(msg string) *ReqRes {
func (cli *grpcClient) FlushAsync() *ReqRes { func (cli *grpcClient) FlushAsync() *ReqRes {
req := types.ToRequestFlush() req := types.ToRequestFlush()
res, err := cli.client.Flush(context.Background(), req.GetFlush()) res, err := cli.client.Flush(context.Background(), req.GetFlush(), grpc.FailFast(true))
if err != nil { if err != nil {
cli.err = err cli.err = err
} }
@ -116,7 +140,7 @@ func (cli *grpcClient) FlushAsync() *ReqRes {
func (cli *grpcClient) InfoAsync() *ReqRes { func (cli *grpcClient) InfoAsync() *ReqRes {
req := types.ToRequestInfo() req := types.ToRequestInfo()
res, err := cli.client.Info(context.Background(), req.GetInfo()) res, err := cli.client.Info(context.Background(), req.GetInfo(), grpc.FailFast(true))
if err != nil { if err != nil {
cli.err = err cli.err = err
} }
@ -125,7 +149,7 @@ func (cli *grpcClient) InfoAsync() *ReqRes {
func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes { func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes {
req := types.ToRequestSetOption(key, value) req := types.ToRequestSetOption(key, value)
res, err := cli.client.SetOption(context.Background(), req.GetSetOption()) res, err := cli.client.SetOption(context.Background(), req.GetSetOption(), grpc.FailFast(true))
if err != nil { if err != nil {
cli.err = err cli.err = err
} }
@ -134,7 +158,7 @@ func (cli *grpcClient) SetOptionAsync(key string, value string) *ReqRes {
func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes { func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes {
req := types.ToRequestAppendTx(tx) req := types.ToRequestAppendTx(tx)
res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx()) res, err := cli.client.AppendTx(context.Background(), req.GetAppendTx(), grpc.FailFast(true))
if err != nil { if err != nil {
cli.err = err cli.err = err
} }
@ -143,7 +167,7 @@ func (cli *grpcClient) AppendTxAsync(tx []byte) *ReqRes {
func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes { func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
req := types.ToRequestCheckTx(tx) req := types.ToRequestCheckTx(tx)
res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx()) res, err := cli.client.CheckTx(context.Background(), req.GetCheckTx(), grpc.FailFast(true))
if err != nil { if err != nil {
cli.err = err cli.err = err
} }
@ -152,7 +176,7 @@ func (cli *grpcClient) CheckTxAsync(tx []byte) *ReqRes {
func (cli *grpcClient) QueryAsync(query []byte) *ReqRes { func (cli *grpcClient) QueryAsync(query []byte) *ReqRes {
req := types.ToRequestQuery(query) req := types.ToRequestQuery(query)
res, err := cli.client.Query(context.Background(), req.GetQuery()) res, err := cli.client.Query(context.Background(), req.GetQuery(), grpc.FailFast(true))
if err != nil { if err != nil {
cli.err = err cli.err = err
} }
@ -161,7 +185,7 @@ func (cli *grpcClient) QueryAsync(query []byte) *ReqRes {
func (cli *grpcClient) CommitAsync() *ReqRes { func (cli *grpcClient) CommitAsync() *ReqRes {
req := types.ToRequestCommit() req := types.ToRequestCommit()
res, err := cli.client.Commit(context.Background(), req.GetCommit()) res, err := cli.client.Commit(context.Background(), req.GetCommit(), grpc.FailFast(true))
if err != nil { if err != nil {
cli.err = err cli.err = err
} }
@ -170,7 +194,7 @@ func (cli *grpcClient) CommitAsync() *ReqRes {
func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes { func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes {
req := types.ToRequestInitChain(validators) req := types.ToRequestInitChain(validators)
res, err := cli.client.InitChain(context.Background(), req.GetInitChain()) res, err := cli.client.InitChain(context.Background(), req.GetInitChain(), grpc.FailFast(true))
if err != nil { if err != nil {
cli.err = err cli.err = err
} }
@ -179,7 +203,7 @@ func (cli *grpcClient) InitChainAsync(validators []*types.Validator) *ReqRes {
func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes { func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes {
req := types.ToRequestBeginBlock(height) req := types.ToRequestBeginBlock(height)
res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock()) res, err := cli.client.BeginBlock(context.Background(), req.GetBeginBlock(), grpc.FailFast(true))
if err != nil { if err != nil {
cli.err = err cli.err = err
} }
@ -188,7 +212,7 @@ func (cli *grpcClient) BeginBlockAsync(height uint64) *ReqRes {
func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes { func (cli *grpcClient) EndBlockAsync(height uint64) *ReqRes {
req := types.ToRequestEndBlock(height) req := types.ToRequestEndBlock(height)
res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock()) res, err := cli.client.EndBlock(context.Background(), req.GetEndBlock(), grpc.FailFast(true))
if err != nil { if err != nil {
cli.err = err cli.err = err
} }
@ -216,11 +240,35 @@ func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response)
return reqres return reqres
} }
func (cli *grpcClient) checkErrGetResult() *types.Result {
if cli.err != nil {
errorLog := cli.err.Error()
cli.StopForError(cli.err)
result := types.ErrInternalError
result.SetLog(errorLog)
return &result
}
return nil
}
func (cli *grpcClient) checkGetErr() error {
if cli.err != nil {
err := errors.New(cli.err.Error())
cli.StopForError(cli.err)
return err
}
return nil
}
//---------------------------------------- //----------------------------------------
func (cli *grpcClient) EchoSync(msg string) (res types.Result) { func (cli *grpcClient) EchoSync(msg string) (res types.Result) {
r := cli.EchoAsync(msg).Response.GetEcho() reqres := cli.EchoAsync(msg)
return types.NewResultOK([]byte(r.Message), LOG) if res := cli.checkErrGetResult(); res != nil {
return *res
}
resp := reqres.Response.GetEcho()
return types.NewResultOK([]byte(resp.Message), LOG)
} }
func (cli *grpcClient) FlushSync() error { func (cli *grpcClient) FlushSync() error {
@ -228,14 +276,18 @@ func (cli *grpcClient) FlushSync() error {
} }
func (cli *grpcClient) InfoSync() (res types.Result) { func (cli *grpcClient) InfoSync() (res types.Result) {
r := cli.InfoAsync().Response.GetInfo() reqres := cli.InfoAsync()
return types.NewResultOK([]byte(r.Info), LOG) if res := cli.checkErrGetResult(); res != nil {
return *res
}
resp := reqres.Response.GetInfo()
return types.NewResultOK([]byte(resp.Info), LOG)
} }
func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) { func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result) {
reqres := cli.SetOptionAsync(key, value) reqres := cli.SetOptionAsync(key, value)
if cli.err != nil { if res := cli.checkErrGetResult(); res != nil {
return types.ErrInternalError.SetLog(cli.err.Error()) return *res
} }
resp := reqres.Response.GetSetOption() resp := reqres.Response.GetSetOption()
return types.Result{Code: OK, Data: nil, Log: resp.Log} return types.Result{Code: OK, Data: nil, Log: resp.Log}
@ -243,8 +295,8 @@ func (cli *grpcClient) SetOptionSync(key string, value string) (res types.Result
func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) { func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) {
reqres := cli.AppendTxAsync(tx) reqres := cli.AppendTxAsync(tx)
if cli.err != nil { if res := cli.checkErrGetResult(); res != nil {
return types.ErrInternalError.SetLog(cli.err.Error()) return *res
} }
resp := reqres.Response.GetAppendTx() resp := reqres.Response.GetAppendTx()
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
@ -252,8 +304,8 @@ func (cli *grpcClient) AppendTxSync(tx []byte) (res types.Result) {
func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) { func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) {
reqres := cli.CheckTxAsync(tx) reqres := cli.CheckTxAsync(tx)
if cli.err != nil { if res := cli.checkErrGetResult(); res != nil {
return types.ErrInternalError.SetLog(cli.err.Error()) return *res
} }
resp := reqres.Response.GetCheckTx() resp := reqres.Response.GetCheckTx()
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
@ -261,8 +313,8 @@ func (cli *grpcClient) CheckTxSync(tx []byte) (res types.Result) {
func (cli *grpcClient) QuerySync(query []byte) (res types.Result) { func (cli *grpcClient) QuerySync(query []byte) (res types.Result) {
reqres := cli.QueryAsync(query) reqres := cli.QueryAsync(query)
if cli.err != nil { if res := cli.checkErrGetResult(); res != nil {
return types.ErrInternalError.SetLog(cli.err.Error()) return *res
} }
resp := reqres.Response.GetQuery() resp := reqres.Response.GetQuery()
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
@ -270,8 +322,8 @@ func (cli *grpcClient) QuerySync(query []byte) (res types.Result) {
func (cli *grpcClient) CommitSync() (res types.Result) { func (cli *grpcClient) CommitSync() (res types.Result) {
reqres := cli.CommitAsync() reqres := cli.CommitAsync()
if cli.err != nil { if res := cli.checkErrGetResult(); res != nil {
return types.ErrInternalError.SetLog(cli.err.Error()) return *res
} }
resp := reqres.Response.GetCommit() resp := reqres.Response.GetCommit()
return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log} return types.Result{Code: resp.Code, Data: resp.Data, Log: resp.Log}
@ -279,24 +331,24 @@ func (cli *grpcClient) CommitSync() (res types.Result) {
func (cli *grpcClient) InitChainSync(validators []*types.Validator) (err error) { func (cli *grpcClient) InitChainSync(validators []*types.Validator) (err error) {
cli.InitChainAsync(validators) cli.InitChainAsync(validators)
if cli.err != nil { if err := cli.checkGetErr(); err != nil {
return cli.err return err
} }
return nil return nil
} }
func (cli *grpcClient) BeginBlockSync(height uint64) (err error) { func (cli *grpcClient) BeginBlockSync(height uint64) (err error) {
cli.BeginBlockAsync(height) cli.BeginBlockAsync(height)
if cli.err != nil { if err := cli.checkGetErr(); err != nil {
return cli.err return err
} }
return nil return nil
} }
func (cli *grpcClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) { func (cli *grpcClient) EndBlockSync(height uint64) (validators []*types.Validator, err error) {
reqres := cli.EndBlockAsync(height) reqres := cli.EndBlockAsync(height)
if cli.err != nil { if err := cli.checkGetErr(); err != nil {
return nil, cli.err return nil, err
} }
return reqres.Response.GetEndBlock().Diffs, nil return reqres.Response.GetEndBlock().Diffs, nil
} }

View File

@ -1,11 +1,14 @@
package tmspcli package tmspcli
import ( import (
types "github.com/tendermint/tmsp/types"
"sync" "sync"
. "github.com/tendermint/go-common"
types "github.com/tendermint/tmsp/types"
) )
type localClient struct { type localClient struct {
*BaseService
mtx *sync.Mutex mtx *sync.Mutex
types.Application types.Application
Callback Callback
@ -15,10 +18,12 @@ func NewLocalClient(mtx *sync.Mutex, app types.Application) *localClient {
if mtx == nil { if mtx == nil {
mtx = new(sync.Mutex) mtx = new(sync.Mutex)
} }
return &localClient{ cli := &localClient{
mtx: mtx, mtx: mtx,
Application: app, Application: app,
} }
cli.BaseService = NewBaseService(log, "localClient", cli)
return cli
} }
func (app *localClient) SetResponseCallback(cb Callback) { func (app *localClient) SetResponseCallback(cb Callback) {
@ -32,10 +37,6 @@ func (app *localClient) Error() error {
return nil return nil
} }
func (app *localClient) Stop() bool {
return true
}
func (app *localClient) FlushAsync() *ReqRes { func (app *localClient) FlushAsync() *ReqRes {
// Do nothing // Do nothing
return newLocalReqRes(types.ToRequestFlush(), nil) return newLocalReqRes(types.ToRequestFlush(), nil)

View File

@ -28,7 +28,6 @@ const flushThrottleMS = 20 // Don't wait longer than...
// with concurrent callers. // with concurrent callers.
type socketClient struct { type socketClient struct {
QuitService QuitService
sync.Mutex // [EB]: is this even used?
reqQueue chan *ReqRes reqQueue chan *ReqRes
flushTimer *ThrottleTimer flushTimer *ThrottleTimer
@ -40,6 +39,7 @@ type socketClient struct {
err error err error
reqSent *list.List reqSent *list.List
resCb func(*types.Request, *types.Response) // listens to all callbacks resCb func(*types.Request, *types.Response) // listens to all callbacks
} }
func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) { func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) {
@ -53,49 +53,70 @@ func NewSocketClient(addr string, mustConnect bool) (*socketClient, error) {
resCb: nil, resCb: nil,
} }
cli.QuitService = *NewQuitService(nil, "socketClient", cli) cli.QuitService = *NewQuitService(nil, "socketClient", cli)
_, err := cli.Start() // Just start it, it's confusing for callers to remember to start. _, err := cli.Start() // Just start it, it's confusing for callers to remember to start.
return cli, err return cli, err
} }
func (cli *socketClient) OnStart() error { func (cli *socketClient) OnStart() error {
cli.QuitService.OnStart() cli.QuitService.OnStart()
var err error
var conn net.Conn
RETRY_LOOP: RETRY_LOOP:
for { for {
conn, err := Connect(cli.addr) conn, err = Connect(cli.addr)
if err != nil { if err != nil {
if cli.mustConnect { if cli.mustConnect {
return err return err
} else { } else {
log.Warn(Fmt("tmsp.socketClient failed to connect to %v. Retrying...\n", cli.addr)) log.Warn(Fmt("tmsp.socketClient failed to connect to %v. Retrying...", cli.addr))
time.Sleep(time.Second * 3) time.Sleep(time.Second * 3)
continue RETRY_LOOP continue RETRY_LOOP
} }
} }
cli.conn = conn
go cli.sendRequestsRoutine(conn) go cli.sendRequestsRoutine(conn)
go cli.recvResponseRoutine(conn) go cli.recvResponseRoutine(conn)
return err
return nil
} }
return nil // never happens return nil // never happens
} }
func (cli *socketClient) OnStop() { func (cli *socketClient) OnStop() {
cli.QuitService.OnStop() cli.QuitService.OnStop()
cli.mtx.Lock()
defer cli.mtx.Unlock()
if cli.conn != nil { if cli.conn != nil {
cli.conn.Close() cli.conn.Close()
} }
cli.flushQueue() cli.flushQueue()
} }
func (cli *socketClient) flushQueue() { // Stop the client and set the error
LOOP: func (cli *socketClient) StopForError(err error) {
for { cli.mtx.Lock()
select { if !cli.IsRunning() {
case reqres := <-cli.reqQueue: return
reqres.Done()
default:
break LOOP
}
} }
if cli.err == nil {
cli.err = err
}
cli.mtx.Unlock()
log.Warn(Fmt("Stopping tmsp.socketClient for error: %v", err.Error()))
cli.Stop()
}
func (cli *socketClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
} }
// Set listener for all responses // Set listener for all responses
@ -106,29 +127,10 @@ func (cli *socketClient) SetResponseCallback(resCb Callback) {
cli.resCb = resCb cli.resCb = resCb
} }
func (cli *socketClient) StopForError(err error) {
if !cli.IsRunning() {
return
}
cli.mtx.Lock()
log.Warn(Fmt("Stopping tmsp.socketClient for error: %v\n", err.Error()))
if cli.err == nil {
cli.err = err
}
cli.mtx.Unlock()
cli.Stop()
}
func (cli *socketClient) Error() error {
cli.mtx.Lock()
defer cli.mtx.Unlock()
return cli.err
}
//---------------------------------------- //----------------------------------------
func (cli *socketClient) sendRequestsRoutine(conn net.Conn) { func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
w := bufio.NewWriter(conn) w := bufio.NewWriter(conn)
for { for {
select { select {
@ -144,14 +146,14 @@ func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
cli.willSendReq(reqres) cli.willSendReq(reqres)
err := types.WriteMessage(reqres.Request, w) err := types.WriteMessage(reqres.Request, w)
if err != nil { if err != nil {
cli.StopForError(err) cli.StopForError(fmt.Errorf("Error writing msg: %v", err))
return return
} }
// log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request) // log.Debug("Sent request", "requestType", reflect.TypeOf(reqres.Request), "request", reqres.Request)
if _, ok := reqres.Request.Value.(*types.Request_Flush); ok { if _, ok := reqres.Request.Value.(*types.Request_Flush); ok {
err = w.Flush() err = w.Flush()
if err != nil { if err != nil {
cli.StopForError(err) cli.StopForError(fmt.Errorf("Error flushing writer: %v", err))
return return
} }
} }
@ -160,6 +162,7 @@ func (cli *socketClient) sendRequestsRoutine(conn net.Conn) {
} }
func (cli *socketClient) recvResponseRoutine(conn net.Conn) { func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
r := bufio.NewReader(conn) // Buffer reads r := bufio.NewReader(conn) // Buffer reads
for { for {
var res = &types.Response{} var res = &types.Response{}
@ -172,11 +175,13 @@ func (cli *socketClient) recvResponseRoutine(conn net.Conn) {
case *types.Response_Exception: case *types.Response_Exception:
// XXX After setting cli.err, release waiters (e.g. reqres.Done()) // XXX After setting cli.err, release waiters (e.g. reqres.Done())
cli.StopForError(errors.New(r.Exception.Error)) cli.StopForError(errors.New(r.Exception.Error))
return
default: default:
// log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res) // log.Debug("Received response", "responseType", reflect.TypeOf(res), "response", res)
err := cli.didRecvResponse(res) err := cli.didRecvResponse(res)
if err != nil { if err != nil {
cli.StopForError(err) cli.StopForError(err)
return
} }
} }
} }
@ -280,9 +285,8 @@ func (cli *socketClient) EchoSync(msg string) (res types.Result) {
func (cli *socketClient) FlushSync() error { func (cli *socketClient) FlushSync() error {
reqRes := cli.queueRequest(types.ToRequestFlush()) reqRes := cli.queueRequest(types.ToRequestFlush())
if reqRes == nil { if cli.err != nil {
return fmt.Errorf("Remote app is not running") return types.ErrInternalError.SetLog(cli.err.Error())
} }
reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here reqRes.Wait() // NOTE: if we don't flush the queue, its possible to get stuck here
return cli.err return cli.err
@ -378,10 +382,6 @@ func (cli *socketClient) EndBlockSync(height uint64) (validators []*types.Valida
//---------------------------------------- //----------------------------------------
func (cli *socketClient) queueRequest(req *types.Request) *ReqRes { func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
if !cli.IsRunning() {
return nil
}
reqres := NewReqRes(req) reqres := NewReqRes(req)
// TODO: set cli.err if reqQueue times out // TODO: set cli.err if reqQueue times out
@ -398,6 +398,18 @@ func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
return reqres return reqres
} }
func (cli *socketClient) flushQueue() {
LOOP:
for {
select {
case reqres := <-cli.reqQueue:
reqres.Done()
default:
break LOOP
}
}
}
//---------------------------------------- //----------------------------------------
func resMatchesReq(req *types.Request, res *types.Response) (ok bool) { func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {

View File

@ -1,93 +0,0 @@
package dummy
import (
"reflect"
"testing"
"time"
. "github.com/tendermint/go-common"
"github.com/tendermint/tmsp/server"
"github.com/tendermint/tmsp/types"
)
func TestStream(t *testing.T) {
numAppendTxs := 200000
// Start the listener
server, err := server.NewSocketServer("unix://test.sock", NewDummyApplication())
if err != nil {
Exit(err.Error())
}
defer server.Stop()
// Connect to the socket
conn, err := Connect("unix://test.sock")
if err != nil {
Exit(err.Error())
}
// Read response data
done := make(chan struct{})
go func() {
counter := 0
for {
var res = &types.Response{}
err := types.ReadMessage(conn, res)
if err != nil {
Exit(err.Error())
}
// Process response
switch r := res.Value.(type) {
case *types.Response_AppendTx:
counter += 1
if r.AppendTx.Code != types.CodeType_OK {
t.Error("AppendTx failed with ret_code", r.AppendTx.Code)
}
if counter > numAppendTxs {
t.Fatal("Too many AppendTx responses")
}
t.Log("response", counter)
if counter == numAppendTxs {
go func() {
time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow
close(done)
}()
}
case *types.Response_Flush:
// ignore
default:
t.Error("Unexpected response type", reflect.TypeOf(res.Value))
}
}
}()
// Write requests
for counter := 0; counter < numAppendTxs; counter++ {
// Send request
var req = types.ToRequestAppendTx([]byte("test"))
err := types.WriteMessage(req, conn)
if err != nil {
t.Fatal(err.Error())
}
// Sometimes send flush messages
if counter%123 == 0 {
t.Log("flush")
err := types.WriteMessage(types.ToRequestFlush(), conn)
if err != nil {
t.Fatal(err.Error())
}
}
}
// Send final flush message
err = types.WriteMessage(types.ToRequestFlush(), conn)
if err != nil {
t.Fatal(err.Error())
}
<-done
}

151
example/example_test.go Normal file
View File

@ -0,0 +1,151 @@
package nilapp
import (
"fmt"
"net"
"reflect"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
. "github.com/tendermint/go-common"
"github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/example/dummy"
nilapp "github.com/tendermint/tmsp/example/nil"
"github.com/tendermint/tmsp/server"
"github.com/tendermint/tmsp/types"
)
func TestDummy(t *testing.T) {
fmt.Println("### Testing Dummy")
testStream(t, dummy.NewDummyApplication())
}
func TestNilApp(t *testing.T) {
fmt.Println("### Testing NilApp")
testStream(t, nilapp.NewNilApplication())
}
func TestGRPC(t *testing.T) {
fmt.Println("### Testing GRPC")
testGRPCSync(t, types.NewGRPCApplication(nilapp.NewNilApplication()))
}
func testStream(t *testing.T, app types.Application) {
numAppendTxs := 200000
// Start the listener
server, err := server.NewSocketServer("unix://test.sock", app)
if err != nil {
Exit(Fmt("Error starting socket server: %v", err.Error()))
}
defer server.Stop()
// Connect to the socket
client, err := tmspcli.NewSocketClient("unix://test.sock", false)
if err != nil {
Exit(Fmt("Error starting socket client: %v", err.Error()))
}
client.Start()
defer client.Stop()
done := make(chan struct{})
counter := 0
client.SetResponseCallback(func(req *types.Request, res *types.Response) {
// Process response
switch r := res.Value.(type) {
case *types.Response_AppendTx:
counter += 1
if r.AppendTx.Code != types.CodeType_OK {
t.Error("AppendTx failed with ret_code", r.AppendTx.Code)
}
if counter > numAppendTxs {
t.Fatalf("Too many AppendTx responses. Got %d, expected %d", counter, numAppendTxs)
}
if counter == numAppendTxs {
go func() {
time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow
close(done)
}()
return
}
case *types.Response_Flush:
// ignore
default:
t.Error("Unexpected response type", reflect.TypeOf(res.Value))
}
})
// Write requests
for counter := 0; counter < numAppendTxs; counter++ {
// Send request
reqRes := client.AppendTxAsync([]byte("test"))
_ = reqRes
// check err ?
// Sometimes send flush messages
if counter%123 == 0 {
client.FlushAsync()
// check err ?
}
}
// Send final flush message
client.FlushAsync()
<-done
}
//-------------------------
// test grpc
func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
return Connect(addr)
}
func testGRPCSync(t *testing.T, app *types.GRPCApplication) {
numAppendTxs := 2000
// Start the listener
server, err := server.NewGRPCServer("unix://test.sock", app)
if err != nil {
Exit(Fmt("Error starting GRPC server: %v", err.Error()))
}
defer server.Stop()
// Connect to the socket
conn, err := grpc.Dial("unix://test.sock", grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
if err != nil {
Exit(Fmt("Error dialing GRPC server: %v", err.Error()))
}
defer conn.Close()
client := types.NewTMSPApplicationClient(conn)
// Write requests
for counter := 0; counter < numAppendTxs; counter++ {
// Send request
response, err := client.AppendTx(context.Background(), &types.RequestAppendTx{[]byte("test")})
if err != nil {
t.Fatalf("Error in GRPC AppendTx: %v", err.Error())
}
counter += 1
if response.Code != types.CodeType_OK {
t.Error("AppendTx failed with ret_code", response.Code)
}
if counter > numAppendTxs {
t.Fatal("Too many AppendTx responses")
}
t.Log("response", counter)
if counter == numAppendTxs {
go func() {
time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow
}()
}
}
}

View File

@ -1,148 +0,0 @@
package nilapp
import (
"net"
"reflect"
"testing"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
. "github.com/tendermint/go-common"
"github.com/tendermint/tmsp/server"
"github.com/tendermint/tmsp/types"
)
func TestStream(t *testing.T) {
numAppendTxs := 200000
// Start the listener
server, err := server.NewSocketServer("unix://test.sock", NewNilApplication())
if err != nil {
Exit(err.Error())
}
defer server.Stop()
// Connect to the socket
conn, err := Connect("unix://test.sock")
if err != nil {
Exit(err.Error())
}
// Read response data
done := make(chan struct{})
go func() {
counter := 0
for {
var res = &types.Response{}
err := types.ReadMessage(conn, res)
if err != nil {
Exit(err.Error())
}
// Process response
switch r := res.Value.(type) {
case *types.Response_AppendTx:
counter += 1
if r.AppendTx.Code != types.CodeType_OK {
t.Error("AppendTx failed with ret_code", r.AppendTx.Code)
}
if counter > numAppendTxs {
t.Fatal("Too many AppendTx responses")
}
t.Log("response", counter)
if counter == numAppendTxs {
go func() {
time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow
close(done)
}()
}
case *types.Response_Flush:
// ignore
default:
t.Error("Unexpected response type", reflect.TypeOf(res.Value))
}
}
}()
// Write requests
for counter := 0; counter < numAppendTxs; counter++ {
// Send request
var req = types.ToRequestAppendTx([]byte("test"))
err := types.WriteMessage(req, conn)
if err != nil {
t.Fatal(err.Error())
}
// Sometimes send flush messages
if counter%123 == 0 {
t.Log("flush")
err := types.WriteMessage(types.ToRequestFlush(), conn)
if err != nil {
t.Fatal(err.Error())
}
}
}
// Send final flush message
err = types.WriteMessage(types.ToRequestFlush(), conn)
if err != nil {
t.Fatal(err.Error())
}
<-done
}
//-------------------------
// test grpc
func dialerFunc(addr string, timeout time.Duration) (net.Conn, error) {
return Connect(addr)
}
func TestGRPCSync(t *testing.T) {
numAppendTxs := 2000
// Start the listener
server, err := server.NewGRPCServer("unix://test.sock", types.NewGRPCApplication(NewNilApplication()))
if err != nil {
Exit(err.Error())
}
defer server.Stop()
// Connect to the socket
conn, err := grpc.Dial("unix://test.sock", grpc.WithInsecure(), grpc.WithDialer(dialerFunc))
if err != nil {
Exit(err.Error())
}
defer conn.Close()
client := types.NewTMSPApplicationClient(conn)
// Write requests
for counter := 0; counter < numAppendTxs; counter++ {
// Send request
response, err := client.AppendTx(context.Background(), &types.RequestAppendTx{[]byte("test")})
if err != nil {
t.Fatal(err.Error())
}
counter += 1
if response.Code != types.CodeType_OK {
t.Error("AppendTx failed with ret_code", response.Code)
}
if counter > numAppendTxs {
t.Fatal("Too many AppendTx responses")
}
t.Log("response", counter)
if counter == numAppendTxs {
go func() {
time.Sleep(time.Second * 2) // Wait for a bit to allow counter overflow
}()
}
}
}

View File

@ -18,6 +18,7 @@ type GRPCServer struct {
proto string proto string
addr string addr string
listener net.Listener listener net.Listener
server *grpc.Server
app types.TMSPApplicationServer app types.TMSPApplicationServer
} }
@ -43,13 +44,13 @@ func (s *GRPCServer) OnStart() error {
return err return err
} }
s.listener = ln s.listener = ln
grpcServer := grpc.NewServer() s.server = grpc.NewServer()
types.RegisterTMSPApplicationServer(grpcServer, s.app) types.RegisterTMSPApplicationServer(s.server, s.app)
go grpcServer.Serve(ln) go s.server.Serve(s.listener)
return nil return nil
} }
func (s *GRPCServer) OnStop() { func (s *GRPCServer) OnStop() {
s.QuitService.OnStop() s.QuitService.OnStop()
s.listener.Close() s.server.Stop()
} }

7
server/log.go Normal file
View File

@ -0,0 +1,7 @@
package server
import (
"github.com/tendermint/go-logger"
)
var log = logger.New("module", "tmsp-server")

View File

@ -21,6 +21,10 @@ type SocketServer struct {
addr string addr string
listener net.Listener listener net.Listener
connsMtx sync.Mutex
conns map[int]net.Conn
nextConnID int
appMtx sync.Mutex appMtx sync.Mutex
app types.Application app types.Application
} }
@ -33,6 +37,7 @@ func NewSocketServer(protoAddr string, app types.Application) (Service, error) {
addr: addr, addr: addr,
listener: nil, listener: nil,
app: app, app: app,
conns: make(map[int]net.Conn),
} }
s.QuitService = *NewQuitService(nil, "TMSPServer", s) s.QuitService = *NewQuitService(nil, "TMSPServer", s)
_, err := s.Start() // Just start it _, err := s.Start() // Just start it
@ -53,6 +58,33 @@ func (s *SocketServer) OnStart() error {
func (s *SocketServer) OnStop() { func (s *SocketServer) OnStop() {
s.QuitService.OnStop() s.QuitService.OnStop()
s.listener.Close() s.listener.Close()
s.connsMtx.Lock()
for id, conn := range s.conns {
delete(s.conns, id)
conn.Close()
}
s.connsMtx.Unlock()
}
func (s *SocketServer) addConn(conn net.Conn) int {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()
connID := s.nextConnID
s.nextConnID += 1
s.conns[connID] = conn
return connID
}
// deletes conn even if close errs
func (s *SocketServer) rmConn(connID int, conn net.Conn) error {
s.connsMtx.Lock()
defer s.connsMtx.Unlock()
delete(s.conns, connID)
return conn.Close()
} }
func (s *SocketServer) acceptConnectionsRoutine() { func (s *SocketServer) acceptConnectionsRoutine() {
@ -62,7 +94,7 @@ func (s *SocketServer) acceptConnectionsRoutine() {
// semaphore <- struct{}{} // semaphore <- struct{}{}
// Accept a connection // Accept a connection
fmt.Println("Waiting for new connection...") log.Notice("Waiting for new connection...")
conn, err := s.listener.Accept() conn, err := s.listener.Accept()
if err != nil { if err != nil {
if !s.IsRunning() { if !s.IsRunning() {
@ -70,9 +102,11 @@ func (s *SocketServer) acceptConnectionsRoutine() {
} }
Exit("Failed to accept connection: " + err.Error()) Exit("Failed to accept connection: " + err.Error())
} else { } else {
fmt.Println("Accepted a new connection") log.Notice("Accepted a new connection")
} }
connID := s.addConn(conn)
closeConn := make(chan error, 2) // Push to signal connection closed closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan *types.Response, 1000) // A channel to buffer responses responses := make(chan *types.Response, 1000) // A channel to buffer responses
@ -84,16 +118,20 @@ func (s *SocketServer) acceptConnectionsRoutine() {
go func() { go func() {
// Wait until signal to close connection // Wait until signal to close connection
errClose := <-closeConn errClose := <-closeConn
if errClose != nil { if err == io.EOF {
fmt.Printf("Connection error: %v\n", errClose) log.Warn("Connection was closed by client")
return // is this correct? the conn is closed?
} else if errClose != nil {
log.Warn("Connection error", "error", errClose)
} else { } else {
fmt.Println("Connection was closed.") // never happens
log.Warn("Connection was closed.")
} }
// Close the connection // Close the connection
err := conn.Close() err := s.rmConn(connID, conn)
if err != nil { if err != nil {
fmt.Printf("Error in closing connection: %v\n", err) log.Warn("Error in closing connection", "error", err)
} }
// <-semaphore // <-semaphore
@ -111,9 +149,9 @@ func (s *SocketServer) handleRequests(closeConn chan error, conn net.Conn, respo
err := types.ReadMessage(bufReader, req) err := types.ReadMessage(bufReader, req)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
closeConn <- fmt.Errorf("Connection closed by client") closeConn <- err
} else { } else {
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) closeConn <- fmt.Errorf("Error reading message: %v", err.Error())
} }
return return
} }
@ -176,13 +214,13 @@ func (s *SocketServer) handleResponses(closeConn chan error, responses <-chan *t
var res = <-responses var res = <-responses
err := types.WriteMessage(res, bufWriter) err := types.WriteMessage(res, bufWriter)
if err != nil { if err != nil {
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) closeConn <- fmt.Errorf("Error writing message: %v", err.Error())
return return
} }
if _, ok := res.Value.(*types.Response_Flush); ok { if _, ok := res.Value.(*types.Response_Flush); ok {
err = bufWriter.Flush() err = bufWriter.Flush()
if err != nil { if err != nil {
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error()) closeConn <- fmt.Errorf("Error flushing write buffer: %v", err.Error())
return return
} }
} }

View File

@ -9,7 +9,7 @@ function testExample() {
$APP &> /dev/null & $APP &> /dev/null &
sleep 2 sleep 2
tmsp-cli --verbose batch < $INPUT > "${INPUT}.out.new" tmsp-cli --verbose batch < $INPUT > "${INPUT}.out.new"
killall "$APP" > /dev/null killall "$APP" &> /dev/null
pre=`shasum < "${INPUT}.out"` pre=`shasum < "${INPUT}.out"`
post=`shasum < "${INPUT}.out.new"` post=`shasum < "${INPUT}.out.new"`