diff --git a/rpc/core/mempool.go b/rpc/core/mempool.go index ad599d22..0a57167e 100644 --- a/rpc/core/mempool.go +++ b/rpc/core/mempool.go @@ -39,16 +39,9 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { }, nil } -// CONTRACT: returns error==nil iff the tx is included in a block. -// -// If CheckTx fails, return with the response from CheckTx AND an error. -// Else, block until the tx is included in a block, -// and return the result of AppendTx (with no error). -// Even if AppendTx fails, so long as the tx is included in a block this function -// will not return an error - it is the caller's responsibility to check res.Code. -// The function times out after five minutes and returns the result of CheckTx and an error. -// TODO: smarter timeout logic or someway to cancel (tx not getting committed is a sign of a larger problem!) -func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { +// CONTRACT: only returns error if mempool.BroadcastTx errs (ie. problem with the app) +// or if we timeout waiting for tx to commit +func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTxCommit, error) { // subscribe to tx being committed in block appendTxResCh := make(chan types.EventDataTx, 1) @@ -66,13 +59,12 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { } checkTxRes := <-checkTxResCh checkTxR := checkTxRes.GetCheckTx() - if r := checkTxR; r.Code != tmsp.CodeType_OK { + if checkTxR.Code != tmsp.CodeType_OK { // CheckTx failed! - return &ctypes.ResultBroadcastTx{ - Code: r.Code, - Data: r.Data, - Log: r.Log, - }, fmt.Errorf("Check tx failed with non-zero code: %s. Data: %X; Log: %s", r.Code.String(), r.Data, r.Log) + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: checkTxR, + AppendTx: nil, + }, nil } // Wait for the tx to be included in a block, @@ -81,19 +73,15 @@ func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { select { case appendTxRes := <-appendTxResCh: // The tx was included in a block. - // NOTE we don't return an error regardless of the AppendTx code; - // clients must check this to see if they need to send a new tx! - return &ctypes.ResultBroadcastTx{ - Code: appendTxRes.Code, - Data: appendTxRes.Result, - Log: appendTxRes.Log, + appendTxR := appendTxRes.GetAppendTx() + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: checkTxR, + AppendTx: appendTxR, }, nil case <-timer.C: - r := checkTxR - return &ctypes.ResultBroadcastTx{ - Code: r.Code, - Data: r.Data, - Log: r.Log, + return &ctypes.ResultBroadcastTxCommit{ + CheckTx: checkTxR, + AppendTx: nil, }, fmt.Errorf("Timed out waiting for transaction to be included in a block") } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index f5f6bae0..0befac67 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -63,6 +63,11 @@ type ResultBroadcastTx struct { Log string `json:"log"` } +type ResultBroadcastTxCommit struct { + CheckTx *tmsp.ResponseCheckTx `json:"check_tx"` + AppendTx *tmsp.ResponseAppendTx `json:"append_tx"` +} + type ResultUnconfirmedTxs struct { N int `json:"n_txs"` Txs []types.Tx `json:"txs"` @@ -115,8 +120,9 @@ const ( ResultTypeDumpConsensusState = byte(0x41) // 0x6 bytes are for txs / the application - ResultTypeBroadcastTx = byte(0x60) - ResultTypeUnconfirmedTxs = byte(0x61) + ResultTypeBroadcastTx = byte(0x60) + ResultTypeUnconfirmedTxs = byte(0x61) + ResultTypeBroadcastTxCommit = byte(0x62) // 0x7 bytes are for querying the application ResultTypeTMSPQuery = byte(0x70) @@ -151,6 +157,7 @@ var _ = wire.RegisterInterface( wire.ConcreteType{&ResultValidators{}, ResultTypeValidators}, wire.ConcreteType{&ResultDumpConsensusState{}, ResultTypeDumpConsensusState}, wire.ConcreteType{&ResultBroadcastTx{}, ResultTypeBroadcastTx}, + wire.ConcreteType{&ResultBroadcastTxCommit{}, ResultTypeBroadcastTxCommit}, wire.ConcreteType{&ResultUnconfirmedTxs{}, ResultTypeUnconfirmedTxs}, wire.ConcreteType{&ResultSubscribe{}, ResultTypeSubscribe}, wire.ConcreteType{&ResultUnsubscribe{}, ResultTypeUnsubscribe}, diff --git a/rpc/grpc/api.go b/rpc/grpc/api.go index 19050054..c8b8dce7 100644 --- a/rpc/grpc/api.go +++ b/rpc/grpc/api.go @@ -11,8 +11,8 @@ type broadcastAPI struct { func (bapi *broadcastAPI) BroadcastTx(ctx context.Context, req *RequestBroadcastTx) (*ResponseBroadcastTx, error) { res, err := core.BroadcastTxCommit(req.Tx) - if res == nil { + if err != nil { return nil, err } - return &ResponseBroadcastTx{uint64(res.Code), res.Data, res.Log}, err + return &ResponseBroadcastTx{res.CheckTx, res.AppendTx}, nil } diff --git a/rpc/grpc/compile.sh b/rpc/grpc/compile.sh new file mode 100644 index 00000000..2c4629c8 --- /dev/null +++ b/rpc/grpc/compile.sh @@ -0,0 +1,3 @@ +#! /bin/bash + +protoc --go_out=plugins=grpc:. -I $GOPATH/src/ -I . types.proto diff --git a/rpc/grpc/types.pb.go b/rpc/grpc/types.pb.go index 51dfb697..06ff3f87 100644 --- a/rpc/grpc/types.pb.go +++ b/rpc/grpc/types.pb.go @@ -9,9 +9,7 @@ It is generated from these files: types.proto It has these top-level messages: - Request RequestBroadcastTx - Response ResponseBroadcastTx */ package core_grpc @@ -19,6 +17,7 @@ package core_grpc import proto "github.com/golang/protobuf/proto" import fmt "fmt" import math "math" +import types "github.com/tendermint/tmsp/types" import ( context "golang.org/x/net/context" @@ -36,96 +35,6 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package -type Request struct { - // Types that are valid to be assigned to Value: - // *Request_BroadcastTx - Value isRequest_Value `protobuf_oneof:"value"` -} - -func (m *Request) Reset() { *m = Request{} } -func (m *Request) String() string { return proto.CompactTextString(m) } -func (*Request) ProtoMessage() {} -func (*Request) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } - -type isRequest_Value interface { - isRequest_Value() -} - -type Request_BroadcastTx struct { - BroadcastTx *RequestBroadcastTx `protobuf:"bytes,1,opt,name=broadcast_tx,json=broadcastTx,oneof"` -} - -func (*Request_BroadcastTx) isRequest_Value() {} - -func (m *Request) GetValue() isRequest_Value { - if m != nil { - return m.Value - } - return nil -} - -func (m *Request) GetBroadcastTx() *RequestBroadcastTx { - if x, ok := m.GetValue().(*Request_BroadcastTx); ok { - return x.BroadcastTx - } - return nil -} - -// XXX_OneofFuncs is for the internal use of the proto package. -func (*Request) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { - return _Request_OneofMarshaler, _Request_OneofUnmarshaler, _Request_OneofSizer, []interface{}{ - (*Request_BroadcastTx)(nil), - } -} - -func _Request_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { - m := msg.(*Request) - // value - switch x := m.Value.(type) { - case *Request_BroadcastTx: - b.EncodeVarint(1<<3 | proto.WireBytes) - if err := b.EncodeMessage(x.BroadcastTx); err != nil { - return err - } - case nil: - default: - return fmt.Errorf("Request.Value has unexpected type %T", x) - } - return nil -} - -func _Request_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { - m := msg.(*Request) - switch tag { - case 1: // value.broadcast_tx - if wire != proto.WireBytes { - return true, proto.ErrInternalBadWireType - } - msg := new(RequestBroadcastTx) - err := b.DecodeMessage(msg) - m.Value = &Request_BroadcastTx{msg} - return true, err - default: - return false, nil - } -} - -func _Request_OneofSizer(msg proto.Message) (n int) { - m := msg.(*Request) - // value - switch x := m.Value.(type) { - case *Request_BroadcastTx: - s := proto.Size(x.BroadcastTx) - n += proto.SizeVarint(1<<3 | proto.WireBytes) - n += proto.SizeVarint(uint64(s)) - n += s - case nil: - default: - panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) - } - return n -} - type RequestBroadcastTx struct { Tx []byte `protobuf:"bytes,1,opt,name=tx,proto3" json:"tx,omitempty"` } @@ -133,113 +42,34 @@ type RequestBroadcastTx struct { func (m *RequestBroadcastTx) Reset() { *m = RequestBroadcastTx{} } func (m *RequestBroadcastTx) String() string { return proto.CompactTextString(m) } func (*RequestBroadcastTx) ProtoMessage() {} -func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } - -type Response struct { - // Types that are valid to be assigned to Value: - // *Response_BroadcastTx - Value isResponse_Value `protobuf_oneof:"value"` -} - -func (m *Response) Reset() { *m = Response{} } -func (m *Response) String() string { return proto.CompactTextString(m) } -func (*Response) ProtoMessage() {} -func (*Response) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } - -type isResponse_Value interface { - isResponse_Value() -} - -type Response_BroadcastTx struct { - BroadcastTx *ResponseBroadcastTx `protobuf:"bytes,1,opt,name=broadcast_tx,json=broadcastTx,oneof"` -} - -func (*Response_BroadcastTx) isResponse_Value() {} - -func (m *Response) GetValue() isResponse_Value { - if m != nil { - return m.Value - } - return nil -} - -func (m *Response) GetBroadcastTx() *ResponseBroadcastTx { - if x, ok := m.GetValue().(*Response_BroadcastTx); ok { - return x.BroadcastTx - } - return nil -} - -// XXX_OneofFuncs is for the internal use of the proto package. -func (*Response) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { - return _Response_OneofMarshaler, _Response_OneofUnmarshaler, _Response_OneofSizer, []interface{}{ - (*Response_BroadcastTx)(nil), - } -} - -func _Response_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { - m := msg.(*Response) - // value - switch x := m.Value.(type) { - case *Response_BroadcastTx: - b.EncodeVarint(1<<3 | proto.WireBytes) - if err := b.EncodeMessage(x.BroadcastTx); err != nil { - return err - } - case nil: - default: - return fmt.Errorf("Response.Value has unexpected type %T", x) - } - return nil -} - -func _Response_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { - m := msg.(*Response) - switch tag { - case 1: // value.broadcast_tx - if wire != proto.WireBytes { - return true, proto.ErrInternalBadWireType - } - msg := new(ResponseBroadcastTx) - err := b.DecodeMessage(msg) - m.Value = &Response_BroadcastTx{msg} - return true, err - default: - return false, nil - } -} - -func _Response_OneofSizer(msg proto.Message) (n int) { - m := msg.(*Response) - // value - switch x := m.Value.(type) { - case *Response_BroadcastTx: - s := proto.Size(x.BroadcastTx) - n += proto.SizeVarint(1<<3 | proto.WireBytes) - n += proto.SizeVarint(uint64(s)) - n += s - case nil: - default: - panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) - } - return n -} +func (*RequestBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } type ResponseBroadcastTx struct { - Code uint64 `protobuf:"varint,1,opt,name=code" json:"code,omitempty"` - Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` - Log string `protobuf:"bytes,3,opt,name=log" json:"log,omitempty"` + CheckTx *types.ResponseCheckTx `protobuf:"bytes,1,opt,name=check_tx,json=checkTx" json:"check_tx,omitempty"` + AppendTx *types.ResponseAppendTx `protobuf:"bytes,2,opt,name=append_tx,json=appendTx" json:"append_tx,omitempty"` } func (m *ResponseBroadcastTx) Reset() { *m = ResponseBroadcastTx{} } func (m *ResponseBroadcastTx) String() string { return proto.CompactTextString(m) } func (*ResponseBroadcastTx) ProtoMessage() {} -func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } +func (*ResponseBroadcastTx) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } + +func (m *ResponseBroadcastTx) GetCheckTx() *types.ResponseCheckTx { + if m != nil { + return m.CheckTx + } + return nil +} + +func (m *ResponseBroadcastTx) GetAppendTx() *types.ResponseAppendTx { + if m != nil { + return m.AppendTx + } + return nil +} func init() { - proto.RegisterType((*Request)(nil), "core_grpc.Request") proto.RegisterType((*RequestBroadcastTx)(nil), "core_grpc.RequestBroadcastTx") - proto.RegisterType((*Response)(nil), "core_grpc.Response") proto.RegisterType((*ResponseBroadcastTx)(nil), "core_grpc.ResponseBroadcastTx") } @@ -318,19 +148,19 @@ var _BroadcastAPI_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("types.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 222 bytes of a gzipped FileDescriptorProto + // 223 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2e, 0xa9, 0x2c, 0x48, 0x2d, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4c, 0xce, 0x2f, 0x4a, 0x8d, 0x4f, 0x2f, - 0x2a, 0x48, 0x56, 0x0a, 0xe3, 0x62, 0x0f, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x72, 0xe2, - 0xe2, 0x49, 0x2a, 0xca, 0x4f, 0x4c, 0x49, 0x4e, 0x2c, 0x2e, 0x89, 0x2f, 0xa9, 0x90, 0x60, 0x54, - 0x60, 0xd4, 0xe0, 0x36, 0x92, 0xd5, 0x83, 0x2b, 0xd6, 0x83, 0xaa, 0x74, 0x82, 0xa9, 0x0a, 0xa9, - 0xf0, 0x60, 0x08, 0xe2, 0x4e, 0x42, 0x70, 0x9d, 0xd8, 0xb9, 0x58, 0xcb, 0x12, 0x73, 0x4a, 0x53, - 0x95, 0x54, 0xb8, 0x84, 0x30, 0x55, 0x0b, 0xf1, 0x71, 0x31, 0x41, 0x0d, 0xe6, 0x09, 0x02, 0xb2, - 0x94, 0x22, 0xb8, 0x38, 0x82, 0x52, 0x8b, 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0x85, 0x9c, 0xb1, 0x5a, - 0x2f, 0x87, 0x62, 0x3d, 0x44, 0x29, 0x31, 0xf6, 0xfb, 0x73, 0x09, 0x63, 0x51, 0x2e, 0x24, 0xc4, - 0xc5, 0x92, 0x9c, 0x9f, 0x92, 0x0a, 0x36, 0x9c, 0x25, 0x08, 0xcc, 0x06, 0x89, 0xa5, 0x24, 0x96, - 0x24, 0x4a, 0x30, 0x81, 0x9d, 0x05, 0x66, 0x0b, 0x09, 0x70, 0x31, 0xe7, 0xe4, 0xa7, 0x4b, 0x30, - 0x03, 0x85, 0x38, 0x83, 0x40, 0x4c, 0xa3, 0x18, 0x2e, 0x1e, 0xb8, 0x41, 0x8e, 0x01, 0x9e, 0x42, - 0x3e, 0x5c, 0xdc, 0xc8, 0x06, 0xe3, 0x0f, 0x26, 0x29, 0x02, 0xde, 0x48, 0x62, 0x03, 0x47, 0x8c, - 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xb2, 0x77, 0xda, 0x85, 0xa7, 0x01, 0x00, 0x00, + 0x2a, 0x48, 0x96, 0xd2, 0x49, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x2f, + 0x49, 0xcd, 0x4b, 0x49, 0x2d, 0xca, 0xcd, 0xcc, 0x2b, 0xd1, 0x2f, 0xc9, 0x2d, 0x2e, 0xd0, 0x07, + 0x6b, 0xd1, 0x47, 0xd2, 0xa8, 0xa4, 0xc2, 0x25, 0x14, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0xe2, + 0x54, 0x94, 0x9f, 0x98, 0x92, 0x9c, 0x58, 0x5c, 0x12, 0x52, 0x21, 0xc4, 0xc7, 0xc5, 0x54, 0x52, + 0x21, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x13, 0x04, 0x64, 0x29, 0xd5, 0x71, 0x09, 0x07, 0xa5, 0x16, + 0x17, 0xe4, 0xe7, 0x15, 0xa7, 0x22, 0x2b, 0x33, 0xe4, 0xe2, 0x48, 0xce, 0x48, 0x4d, 0xce, 0x8e, + 0x87, 0x2a, 0xe6, 0x36, 0x12, 0xd3, 0x83, 0x18, 0x0e, 0x53, 0xed, 0x0c, 0x92, 0x0e, 0xa9, 0x08, + 0x62, 0x4f, 0x86, 0x30, 0x84, 0x4c, 0xb8, 0x38, 0x13, 0x0b, 0x0a, 0x80, 0xce, 0x02, 0xe9, 0x61, + 0x02, 0xeb, 0x11, 0x47, 0xd3, 0xe3, 0x08, 0x96, 0x07, 0x6a, 0xe2, 0x48, 0x84, 0xb2, 0x8c, 0x62, + 0xb8, 0x78, 0xe0, 0xf6, 0x3a, 0x06, 0x78, 0x0a, 0xf9, 0x70, 0x71, 0x23, 0xbb, 0x43, 0x56, 0x0f, + 0xee, 0x7d, 0x3d, 0x4c, 0xdf, 0x48, 0xc9, 0xa1, 0x48, 0x63, 0x78, 0x23, 0x89, 0x0d, 0x1c, 0x14, + 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0x68, 0x73, 0x87, 0xb0, 0x52, 0x01, 0x00, 0x00, } diff --git a/rpc/grpc/types.proto b/rpc/grpc/types.proto index 2f2b96de..ec7f0d1e 100644 --- a/rpc/grpc/types.proto +++ b/rpc/grpc/types.proto @@ -1,18 +1,14 @@ syntax = "proto3"; package core_grpc; +import "github.com/tendermint/tmsp/types/types.proto"; + //---------------------------------------- // Message types //---------------------------------------- // Request types -message Request { - oneof value{ - RequestBroadcastTx broadcast_tx = 1; - } -} - message RequestBroadcastTx { bytes tx = 1; } @@ -20,17 +16,9 @@ message RequestBroadcastTx { //---------------------------------------- // Response types - -message Response { - oneof value{ - ResponseBroadcastTx broadcast_tx = 1; - } -} - message ResponseBroadcastTx{ - uint64 code = 1; // TODO: import tmsp ... - bytes data = 2; - string log = 3; + types.ResponseCheckTx check_tx = 1; + types.ResponseAppendTx append_tx = 2; } //---------------------------------------- diff --git a/rpc/test/client_test.go b/rpc/test/client_test.go index 728e87bd..73729b4f 100644 --- a/rpc/test/client_test.go +++ b/rpc/test/client_test.go @@ -193,9 +193,14 @@ func TestJSONBroadcastTxCommit(t *testing.T) { func testBroadcastTxCommit(t *testing.T, resI interface{}, tx []byte) { tmRes := resI.(*ctypes.TMResult) - res := (*tmRes).(*ctypes.ResultBroadcastTx) - if res.Code != tmsp.CodeType_OK { - panic(Fmt("BroadcastTxCommit got non-zero exit code: %v. %X; %s", res.Code, res.Data, res.Log)) + res := (*tmRes).(*ctypes.ResultBroadcastTxCommit) + checkTx := res.CheckTx + if checkTx.Code != tmsp.CodeType_OK { + panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", checkTx.Code, checkTx.Data, checkTx.Log)) + } + appendTx := res.AppendTx + if appendTx.Code != tmsp.CodeType_OK { + panic(Fmt("BroadcastTxCommit got non-zero exit code from CheckTx: %v. %X; %s", appendTx.Code, appendTx.Data, appendTx.Log)) } mem := node.MempoolReactor().Mempool if mem.Size() != 0 { diff --git a/rpc/test/grpc_test.go b/rpc/test/grpc_test.go index 8fad465b..13672773 100644 --- a/rpc/test/grpc_test.go +++ b/rpc/test/grpc_test.go @@ -15,7 +15,10 @@ func TestBroadcastTx(t *testing.T) { if err != nil { t.Fatal(err) } - if res.Code != 0 { - t.Fatalf("Non-zero code: %d", res.Code) + if res.CheckTx.Code != 0 { + t.Fatalf("Non-zero check tx code: %d", res.CheckTx.Code) + } + if res.AppendTx.Code != 0 { + t.Fatalf("Non-zero append tx code: %d", res.AppendTx.Code) } }