rpc: wait for rpc servers to be available in tests

This commit is contained in:
Ethan Buchman
2017-11-14 21:51:49 +00:00
parent 30f675aafa
commit 194712fd3b
4 changed files with 124 additions and 26 deletions

View File

@ -10,6 +10,11 @@ import (
type broadcastAPI struct { type broadcastAPI struct {
} }
func (bapi *broadcastAPI) Ping(ctx context.Context, req *RequestPing) (*ResponsePing, error) {
// dummy so we can check if the server is up
return &ResponsePing{}, nil
}
func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) { func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) {
res, err := core.BroadcastTxCommit(req.Tx) res, err := core.BroadcastTxCommit(req.Tx)
if err != nil { if err != nil {

View File

@ -9,7 +9,9 @@ It is generated from these files:
types.proto types.proto
It has these top-level messages: It has these top-level messages:
RequestPing
RequestBroadcastTx RequestBroadcastTx
ResponsePing
ResponseBroadcastTx ResponseBroadcastTx
*/ */
package core_grpc package core_grpc
@ -35,6 +37,14 @@ var _ = math.Inf
// proto package needs to be updated. // proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type RequestPing struct {
}
func (m *RequestPing) Reset() { *m = RequestPing{} }
func (m *RequestPing) String() string { return proto.CompactTextString(m) }
func (*RequestPing) ProtoMessage() {}
func (*RequestPing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
type RequestBroadcastTx struct { type RequestBroadcastTx struct {
Tx []byte `protobuf:"bytes,1,opt,name=tx,proto3" json:"tx,omitempty"` Tx []byte `protobuf:"bytes,1,opt,name=tx,proto3" json:"tx,omitempty"`
} }
@ -42,7 +52,7 @@ type RequestBroadcastTx struct {
func (m *RequestBroadcastTx) Reset() { *m = RequestBroadcastTx{} } func (m *RequestBroadcastTx) Reset() { *m = RequestBroadcastTx{} }
func (m *RequestBroadcastTx) String() string { return proto.CompactTextString(m) } func (m *RequestBroadcastTx) String() string { return proto.CompactTextString(m) }
func (*RequestBroadcastTx) ProtoMessage() {} func (*RequestBroadcastTx) ProtoMessage() {}
func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
func (m *RequestBroadcastTx) GetTx() []byte { func (m *RequestBroadcastTx) GetTx() []byte {
if m != nil { if m != nil {
@ -51,6 +61,14 @@ func (m *RequestBroadcastTx) GetTx() []byte {
return nil return nil
} }
type ResponsePing struct {
}
func (m *ResponsePing) Reset() { *m = ResponsePing{} }
func (m *ResponsePing) String() string { return proto.CompactTextString(m) }
func (*ResponsePing) ProtoMessage() {}
func (*ResponsePing) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
type ResponseBroadcastTx struct { type ResponseBroadcastTx struct {
CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"` CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"`
DeliverTx *types.ResponseDeliverTx `protobuf:"bytes,2,opt,name=deliver_tx,json=deliverTx" json:"deliver_tx,omitempty"` DeliverTx *types.ResponseDeliverTx `protobuf:"bytes,2,opt,name=deliver_tx,json=deliverTx" json:"deliver_tx,omitempty"`
@ -59,7 +77,7 @@ type ResponseBroadcastTx struct {
func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} } func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} }
func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) } func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) }
func (*ResponseBroadcastTx) ProtoMessage() {} func (*ResponseBroadcastTx) ProtoMessage() {}
func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} }
func (m *ResponseBroadcastTx) GetCheckTx() *types.ResponseCheckTx { func (m *ResponseBroadcastTx) GetCheckTx() *types.ResponseCheckTx {
if m != nil { if m != nil {
@ -76,7 +94,9 @@ func (m *ResponseBroadcastTx) GetDeliverTx() *types.ResponseDeliverTx {
} }
func init() { func init() {
proto.RegisterType((*RequestPing)(nil), "core_grpc.RequestPing")
proto.RegisterType((*RequestBroadcastTx)(nil), "core_grpc.RequestBroadcastTx") proto.RegisterType((*RequestBroadcastTx)(nil), "core_grpc.RequestBroadcastTx")
proto.RegisterType((*ResponsePing)(nil), "core_grpc.ResponsePing")
proto.RegisterType((*ResponseBroadcastTx)(nil), "core_grpc.ResponseBroadcastTx") proto.RegisterType((*ResponseBroadcastTx)(nil), "core_grpc.ResponseBroadcastTx")
} }
@ -91,6 +111,7 @@ const _ = grpc.SupportPackageIsVersion4
// Client API for BroadcastAPI service // Client API for BroadcastAPI service
type BroadcastAPIClient interface { type BroadcastAPIClient interface {
Ping(ctx context.Context, in *RequestPing, opts ...grpc.CallOption) (*ResponsePing, error)
BroadcastTx(ctx context.Context, in *RequestBroadcastTx, opts ...grpc.CallOption) (*ResponseBroadcastTx, error) BroadcastTx(ctx context.Context, in *RequestBroadcastTx, opts ...grpc.CallOption) (*ResponseBroadcastTx, error)
} }
@ -102,6 +123,15 @@ func NewBroadcastAPIClient(cc *grpc.ClientConn) BroadcastAPIClient {
return &broadcastAPIClient{cc} return &broadcastAPIClient{cc}
} }
func (c *broadcastAPIClient) Ping(ctx context.Context, in *RequestPing, opts ...grpc.CallOption) (*ResponsePing, error) {
out := new(ResponsePing)
err := grpc.Invoke(ctx, "/core_grpc.BroadcastAPI/Ping", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *broadcastAPIClient) BroadcastTx(ctx context.Context, in *RequestBroadcastTx, opts ...grpc.CallOption) (*ResponseBroadcastTx, error) { func (c *broadcastAPIClient) BroadcastTx(ctx context.Context, in *RequestBroadcastTx, opts ...grpc.CallOption) (*ResponseBroadcastTx, error) {
out := new(ResponseBroadcastTx) out := new(ResponseBroadcastTx)
err := grpc.Invoke(ctx, "/core_grpc.BroadcastAPI/BroadcastTx", in, out, c.cc, opts...) err := grpc.Invoke(ctx, "/core_grpc.BroadcastAPI/BroadcastTx", in, out, c.cc, opts...)
@ -114,6 +144,7 @@ func (c *broadcastAPIClient) BroadcastTx(ctx context.Context, in *RequestBroadca
// Server API for BroadcastAPI service // Server API for BroadcastAPI service
type BroadcastAPIServer interface { type BroadcastAPIServer interface {
Ping(context.Context, *RequestPing) (*ResponsePing, error)
BroadcastTx(context.Context, *RequestBroadcastTx) (*ResponseBroadcastTx, error) BroadcastTx(context.Context, *RequestBroadcastTx) (*ResponseBroadcastTx, error)
} }
@ -121,6 +152,24 @@ func RegisterBroadcastAPIServer(s *grpc.Server, srv BroadcastAPIServer) {
s.RegisterService(&_BroadcastAPI_serviceDesc, srv) s.RegisterService(&_BroadcastAPI_serviceDesc, srv)
} }
func _BroadcastAPI_Ping_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestPing)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(BroadcastAPIServer).Ping(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/core_grpc.BroadcastAPI/Ping",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(BroadcastAPIServer).Ping(ctx, req.(*RequestPing))
}
return interceptor(ctx, in, info, handler)
}
func _BroadcastAPI_BroadcastTx_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { func _BroadcastAPI_BroadcastTx_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RequestBroadcastTx) in := new(RequestBroadcastTx)
if err := dec(in); err != nil { if err := dec(in); err != nil {
@ -143,6 +192,10 @@ var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{
ServiceName: "core_grpc.BroadcastAPI", ServiceName: "core_grpc.BroadcastAPI",
HandlerType: (*BroadcastAPIServer)(nil), HandlerType: (*BroadcastAPIServer)(nil),
Methods: []grpc.MethodDesc{ Methods: []grpc.MethodDesc{
{
MethodName: "Ping",
Handler: _BroadcastAPI_Ping_Handler,
},
{ {
MethodName: "BroadcastTx", MethodName: "BroadcastTx",
Handler: _BroadcastAPI_BroadcastTx_Handler, Handler: _BroadcastAPI_BroadcastTx_Handler,
@ -155,20 +208,22 @@ var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("types.proto", fileDescriptor0) } func init() { proto.RegisterFile("types.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{ var fileDescriptor0 = []byte{
// 226 bytes of a gzipped FileDescriptorProto // 264 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48,
0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f,
0x2a, 0x48, 0x96, 0xd2, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f, 0x2a, 0x48, 0x96, 0xd2, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f,
0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0xc9, 0x2d, 0x2e, 0xd0, 0x07, 0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x4f, 0x4c, 0x4a, 0xce, 0xd4, 0x07,
0x6b, 0xd1, 0x47, 0xd2, 0xa8, 0xa4, 0xc2, 0x25, 0x14, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2, 0x6b, 0xd1, 0x47, 0xd2, 0xa8, 0xc4, 0xcb, 0xc5, 0x1d, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0x12,
0x54, 0x94, 0x9f, 0x98, 0x92, 0x9c, 0x58, 0x5c, 0x12, 0x52, 0x21, 0xc4, 0xc7, 0xc5, 0x54, 0x52, 0x90, 0x99, 0x97, 0xae, 0xa4, 0xc2, 0x25, 0x04, 0xe5, 0x3a, 0x15, 0xe5, 0x27, 0xa6, 0x24, 0x27,
0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0xc4, 0x54, 0x52, 0xa1, 0x54, 0xc7, 0x25, 0x1c, 0x94, 0x16, 0x97, 0x84, 0x54, 0x08, 0xf1, 0x71, 0x31, 0x95, 0x54, 0x48, 0x30, 0x2a, 0x30, 0x6a, 0xf0,
0x5a, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x8a, 0xac, 0xcc, 0x90, 0x8b, 0x23, 0x39, 0x23, 0x35, 0x39, 0x04, 0x31, 0x95, 0x54, 0x28, 0xf1, 0x71, 0xf1, 0x04, 0xa5, 0x16, 0x17, 0xe4, 0xe7, 0x15, 0xa7,
0x3b, 0x1e, 0xaa, 0x98, 0xdb, 0x48, 0x4c, 0x0f, 0x62, 0x38, 0x4c, 0xb5, 0x33, 0x48, 0x3a, 0xa4, 0x82, 0x75, 0x35, 0x32, 0x72, 0x09, 0xc3, 0x04, 0x90, 0xf5, 0x19, 0x72, 0x71, 0x24, 0x67, 0xa4,
0x22, 0x88, 0x3d, 0x19, 0xc2, 0x10, 0x32, 0xe1, 0xe2, 0x4c, 0x2c, 0x28, 0x48, 0xcd, 0x4b, 0x01, 0x26, 0x67, 0xc7, 0x43, 0x75, 0x73, 0x1b, 0x89, 0xe9, 0x41, 0x2c, 0x87, 0xa9, 0x76, 0x06, 0x49,
0xe9, 0x61, 0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x0f, 0xa9, 0x08, 0xe2, 0x48, 0x84, 0x87, 0x54, 0x04, 0xb1, 0x27, 0x43, 0x18, 0x42, 0xe6, 0x5c, 0x5c, 0x29, 0xa9, 0x39, 0x99, 0x65,
0xb2, 0x8c, 0x62, 0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb, 0xa9, 0x45, 0x20, 0x4d, 0x4c, 0x60, 0x4d, 0x12, 0x68, 0x9a, 0x5c, 0x20, 0x0a, 0x42, 0x2a, 0x82,
0x43, 0x56, 0x0f, 0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89, 0x38, 0x53, 0x60, 0x4c, 0xa3, 0xa9, 0x8c, 0x5c, 0x3c, 0x70, 0xbb, 0x1d, 0x03, 0x3c, 0x85, 0xcc,
0x0d, 0x1c, 0x14, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01, 0xb9, 0x58, 0x40, 0x8e, 0x13, 0x12, 0xd3, 0x83, 0x87, 0x8d, 0x1e, 0x92, 0x57, 0xa5, 0xc4, 0x51,
0x00, 0x00, 0xc4, 0x11, 0xbe, 0x11, 0xf2, 0xe1, 0xe2, 0x46, 0xf6, 0x84, 0x2c, 0xa6, 0x7e, 0x24, 0x69, 0x29,
0x39, 0x2c, 0xc6, 0x20, 0xc9, 0x27, 0xb1, 0x81, 0xc3, 0xd9, 0x18, 0x10, 0x00, 0x00, 0xff, 0xff,
0x92, 0x29, 0xd9, 0x42, 0xaf, 0x01, 0x00, 0x00,
} }

View File

@ -1,7 +1,7 @@
syntax = "proto3"; syntax = "proto3";
package core_grpc; package core_grpc;
import "github.com/tendermint/abci/blob/master/types/types.proto"; import "github.com/tendermint/abci/types/types.proto";
//---------------------------------------- //----------------------------------------
// Message types // Message types
@ -9,6 +9,9 @@ import "github.com/tendermint/abci/blob/master/types/types.proto";
//---------------------------------------- //----------------------------------------
// Request types // Request types
message RequestPing {
}
message RequestBroadcastTx { message RequestBroadcastTx {
bytes tx = 1; bytes tx = 1;
} }
@ -16,6 +19,9 @@ message RequestBroadcastTx {
//---------------------------------------- //----------------------------------------
// Response types // Response types
message ResponsePing{
}
message ResponseBroadcastTx{ message ResponseBroadcastTx{
types.ResponseCheckTx check_tx = 1; types.ResponseCheckTx check_tx = 1;
types.ResponseDeliverTx deliver_tx = 2; types.ResponseDeliverTx deliver_tx = 2;
@ -25,5 +31,6 @@ message ResponseBroadcastTx{
// Service Definition // Service Definition
service BroadcastAPI { service BroadcastAPI {
rpc Ping(RequestPing) returns (ResponsePing) ;
rpc BroadcastTx(RequestBroadcastTx) returns (ResponseBroadcastTx) ; rpc BroadcastTx(RequestBroadcastTx) returns (ResponseBroadcastTx) ;
} }

View File

@ -1,6 +1,7 @@
package rpctest package rpctest
import ( import (
"context"
"fmt" "fmt"
"math/rand" "math/rand"
"os" "os"
@ -13,11 +14,35 @@ import (
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
nm "github.com/tendermint/tendermint/node" nm "github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
core_grpc "github.com/tendermint/tendermint/rpc/grpc" core_grpc "github.com/tendermint/tendermint/rpc/grpc"
rpcclient "github.com/tendermint/tendermint/rpc/lib/client"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
var config *cfg.Config var globalConfig *cfg.Config
func waitForRPC() {
laddr := GetConfig().RPC.ListenAddress
client := rpcclient.NewJSONRPCClient(laddr)
result := new(ctypes.ResultStatus)
for {
_, err := client.Call("status", map[string]interface{}{}, result)
if err == nil {
return
}
}
}
func waitForGRPC() {
client := GetGRPCClient()
for {
_, err := client.Ping(context.Background(), &core_grpc.RequestPing{})
if err == nil {
return
}
}
}
// f**ing long, but unique for each test // f**ing long, but unique for each test
func makePathname() string { func makePathname() string {
@ -46,21 +71,21 @@ func makeAddrs() (string, string, string) {
// GetConfig returns a config for the test cases as a singleton // GetConfig returns a config for the test cases as a singleton
func GetConfig() *cfg.Config { func GetConfig() *cfg.Config {
if config == nil { if globalConfig == nil {
pathname := makePathname() pathname := makePathname()
config = cfg.ResetTestRoot(pathname) globalConfig = cfg.ResetTestRoot(pathname)
// and we use random ports to run in parallel // and we use random ports to run in parallel
tm, rpc, grpc := makeAddrs() tm, rpc, grpc := makeAddrs()
config.P2P.ListenAddress = tm globalConfig.P2P.ListenAddress = tm
config.RPC.ListenAddress = rpc globalConfig.RPC.ListenAddress = rpc
config.RPC.GRPCListenAddress = grpc globalConfig.RPC.GRPCListenAddress = grpc
} }
return config return globalConfig
} }
func GetGRPCClient() core_grpc.BroadcastAPIClient { func GetGRPCClient() core_grpc.BroadcastAPIClient {
grpcAddr := config.RPC.GRPCListenAddress grpcAddr := globalConfig.RPC.GRPCListenAddress
return core_grpc.StartGRPCClient(grpcAddr) return core_grpc.StartGRPCClient(grpcAddr)
} }
@ -68,7 +93,13 @@ func GetGRPCClient() core_grpc.BroadcastAPIClient {
func StartTendermint(app abci.Application) *nm.Node { func StartTendermint(app abci.Application) *nm.Node {
node := NewTendermint(app) node := NewTendermint(app)
node.Start() node.Start()
// wait for rpc
waitForRPC()
waitForGRPC()
fmt.Println("Tendermint running!") fmt.Println("Tendermint running!")
return node return node
} }