mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-18 15:41:20 +00:00
Correct server protocol
This commit is contained in:
committed by
Ethan Buchman
parent
ff600e9aa0
commit
fec541373d
@ -2,7 +2,6 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
cmn "github.com/tendermint/tmlibs/common"
|
cmn "github.com/tendermint/tmlibs/common"
|
||||||
@ -11,20 +10,27 @@ import (
|
|||||||
priv_val "github.com/tendermint/tendermint/types/priv_validator"
|
priv_val "github.com/tendermint/tendermint/types/priv_validator"
|
||||||
)
|
)
|
||||||
|
|
||||||
var chainID = flag.String("chain-id", "mychain", "chain id")
|
|
||||||
var privValPath = flag.String("priv", "", "priv val file path")
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
var (
|
||||||
|
chainID = flag.String("chain-id", "mychain", "chain id")
|
||||||
|
privValPath = flag.String("priv", "", "priv val file path")
|
||||||
|
socketAddr = flag.String("socket.addr", ":46659", "socket bind addr")
|
||||||
|
|
||||||
|
logger = log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "priv_val")
|
||||||
|
)
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
logger := log.NewTMLogger(log.NewSyncWriter(os.Stdout)).With("module", "priv_val")
|
logger.Info("args", "chainID", *chainID, "privPath", *privValPath)
|
||||||
socketAddr := "localhost:46659"
|
|
||||||
fmt.Println(*chainID)
|
|
||||||
fmt.Println(*privValPath)
|
|
||||||
privVal := priv_val.LoadPrivValidatorJSON(*privValPath)
|
privVal := priv_val.LoadPrivValidatorJSON(*privValPath)
|
||||||
|
|
||||||
pvss := priv_val.NewPrivValidatorSocketServer(logger, socketAddr, *chainID, privVal)
|
pvss := priv_val.NewPrivValidatorSocketServer(
|
||||||
pvss.Start()
|
logger,
|
||||||
|
*socketAddr,
|
||||||
|
*chainID,
|
||||||
|
privVal,
|
||||||
|
)
|
||||||
|
// pvss.Start()
|
||||||
|
|
||||||
cmn.TrapSignal(func() {
|
cmn.TrapSignal(func() {
|
||||||
pvss.Stop()
|
pvss.Stop()
|
||||||
|
@ -3,6 +3,7 @@ package types
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -11,6 +12,7 @@ import (
|
|||||||
"github.com/tendermint/go-wire/data"
|
"github.com/tendermint/go-wire/data"
|
||||||
cmn "github.com/tendermint/tmlibs/common"
|
cmn "github.com/tendermint/tmlibs/common"
|
||||||
"github.com/tendermint/tmlibs/log"
|
"github.com/tendermint/tmlibs/log"
|
||||||
|
"golang.org/x/net/netutil"
|
||||||
|
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
@ -82,16 +84,16 @@ func (pvsc *PrivValidatorSocketClient) Address() data.Bytes {
|
|||||||
|
|
||||||
// PubKey implements PrivValidator.
|
// PubKey implements PrivValidator.
|
||||||
func (pvsc *PrivValidatorSocketClient) PubKey() crypto.PubKey {
|
func (pvsc *PrivValidatorSocketClient) PubKey() crypto.PubKey {
|
||||||
res, err := readWrite(pvsc.conn, PubKeyMsg{})
|
res, err := readWrite(pvsc.conn, &PubKeyMsg{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
return res.(PubKeyMsg).PubKey
|
return res.(*PubKeyMsg).PubKey
|
||||||
}
|
}
|
||||||
|
|
||||||
// SignVote implements PrivValidator.
|
// SignVote implements PrivValidator.
|
||||||
func (pvsc *PrivValidatorSocketClient) SignVote(chainID string, vote *types.Vote) error {
|
func (pvsc *PrivValidatorSocketClient) SignVote(chainID string, vote *types.Vote) error {
|
||||||
res, err := readWrite(pvsc.conn, SignVoteMsg{Vote: vote})
|
res, err := readWrite(pvsc.conn, &SignVoteMsg{Vote: vote})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -101,7 +103,7 @@ func (pvsc *PrivValidatorSocketClient) SignVote(chainID string, vote *types.Vote
|
|||||||
|
|
||||||
// SignProposal implements PrivValidator.
|
// SignProposal implements PrivValidator.
|
||||||
func (pvsc *PrivValidatorSocketClient) SignProposal(chainID string, proposal *types.Proposal) error {
|
func (pvsc *PrivValidatorSocketClient) SignProposal(chainID string, proposal *types.Proposal) error {
|
||||||
res, err := readWrite(pvsc.conn, SignProposalMsg{Proposal: proposal})
|
res, err := readWrite(pvsc.conn, &SignProposalMsg{Proposal: proposal})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -111,7 +113,7 @@ func (pvsc *PrivValidatorSocketClient) SignProposal(chainID string, proposal *ty
|
|||||||
|
|
||||||
// SignHeartbeat implements PrivValidator.
|
// SignHeartbeat implements PrivValidator.
|
||||||
func (pvsc *PrivValidatorSocketClient) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
|
func (pvsc *PrivValidatorSocketClient) SignHeartbeat(chainID string, heartbeat *types.Heartbeat) error {
|
||||||
res, err := readWrite(pvsc.conn, SignHeartbeatMsg{Heartbeat: heartbeat})
|
res, err := readWrite(pvsc.conn, &SignHeartbeatMsg{Heartbeat: heartbeat})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -126,8 +128,10 @@ func (pvsc *PrivValidatorSocketClient) SignHeartbeat(chainID string, heartbeat *
|
|||||||
type PrivValidatorSocketServer struct {
|
type PrivValidatorSocketServer struct {
|
||||||
cmn.BaseService
|
cmn.BaseService
|
||||||
|
|
||||||
proto, addr string
|
proto, addr string
|
||||||
listener net.Listener
|
listener net.Listener
|
||||||
|
maxConnections int
|
||||||
|
privKey crypto.PrivKeyEd25519
|
||||||
|
|
||||||
privVal PrivValidator
|
privVal PrivValidator
|
||||||
chainID string
|
chainID string
|
||||||
@ -135,13 +139,21 @@ type PrivValidatorSocketServer struct {
|
|||||||
|
|
||||||
// NewPrivValidatorSocketServer returns an instance of
|
// NewPrivValidatorSocketServer returns an instance of
|
||||||
// PrivValidatorSocketServer.
|
// PrivValidatorSocketServer.
|
||||||
func NewPrivValidatorSocketServer(logger log.Logger, socketAddr, chainID string, privVal PrivValidator) *PrivValidatorSocketServer {
|
func NewPrivValidatorSocketServer(
|
||||||
|
logger log.Logger,
|
||||||
|
socketAddr, chainID string,
|
||||||
|
privVal PrivValidator,
|
||||||
|
privKey crypto.PrivKeyEd25519,
|
||||||
|
maxConnections int,
|
||||||
|
) *PrivValidatorSocketServer {
|
||||||
proto, addr := cmn.ProtocolAndAddress(socketAddr)
|
proto, addr := cmn.ProtocolAndAddress(socketAddr)
|
||||||
pvss := &PrivValidatorSocketServer{
|
pvss := &PrivValidatorSocketServer{
|
||||||
proto: proto,
|
proto: proto,
|
||||||
addr: addr,
|
addr: addr,
|
||||||
privVal: privVal,
|
maxConnections: maxConnections,
|
||||||
chainID: chainID,
|
privKey: privKey,
|
||||||
|
privVal: privVal,
|
||||||
|
chainID: chainID,
|
||||||
}
|
}
|
||||||
pvss.BaseService = *cmn.NewBaseService(logger, "privValidatorSocketServer", pvss)
|
pvss.BaseService = *cmn.NewBaseService(logger, "privValidatorSocketServer", pvss)
|
||||||
return pvss
|
return pvss
|
||||||
@ -149,22 +161,20 @@ func NewPrivValidatorSocketServer(logger log.Logger, socketAddr, chainID string,
|
|||||||
|
|
||||||
// OnStart implements cmn.Service.
|
// OnStart implements cmn.Service.
|
||||||
func (pvss *PrivValidatorSocketServer) OnStart() error {
|
func (pvss *PrivValidatorSocketServer) OnStart() error {
|
||||||
if err := pvss.BaseService.OnStart(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
ln, err := net.Listen(pvss.proto, pvss.addr)
|
ln, err := net.Listen(pvss.proto, pvss.addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
pvss.listener = ln
|
|
||||||
|
pvss.listener = netutil.LimitListener(ln, pvss.maxConnections)
|
||||||
|
|
||||||
go pvss.acceptConnectionsRoutine()
|
go pvss.acceptConnectionsRoutine()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnStop implements cmn.Service.
|
// OnStop implements cmn.Service.
|
||||||
func (pvss *PrivValidatorSocketServer) OnStop() {
|
func (pvss *PrivValidatorSocketServer) OnStop() {
|
||||||
pvss.BaseService.OnStop()
|
|
||||||
|
|
||||||
if pvss.listener == nil {
|
if pvss.listener == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -176,9 +186,6 @@ func (pvss *PrivValidatorSocketServer) OnStop() {
|
|||||||
|
|
||||||
func (pvss *PrivValidatorSocketServer) acceptConnectionsRoutine() {
|
func (pvss *PrivValidatorSocketServer) acceptConnectionsRoutine() {
|
||||||
for {
|
for {
|
||||||
// Accept a connection
|
|
||||||
pvss.Logger.Info("Waiting for new connection...")
|
|
||||||
|
|
||||||
conn, err := pvss.listener.Accept()
|
conn, err := pvss.listener.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !pvss.IsRunning() {
|
if !pvss.IsRunning() {
|
||||||
@ -188,47 +195,46 @@ func (pvss *PrivValidatorSocketServer) acceptConnectionsRoutine() {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
pvss.Logger.Info("Accepted a new connection")
|
go pvss.handleConnection(conn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// read/write
|
func (pvss *PrivValidatorSocketServer) handleConnection(conn net.Conn) {
|
||||||
for {
|
defer conn.Close()
|
||||||
if !pvss.IsRunning() {
|
|
||||||
return // Ignore error from listener closing.
|
|
||||||
}
|
|
||||||
|
|
||||||
var n int
|
for {
|
||||||
var err error
|
if !pvss.IsRunning() {
|
||||||
b := wire.ReadByteSlice(conn, 0, &n, &err) //XXX: no max
|
return // Ignore error from listener closing.
|
||||||
if err != nil {
|
}
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := decodeMsg(b)
|
req, err := readMsg(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
pvss.Logger.Error("readMsg", "err", err)
|
||||||
}
|
return
|
||||||
var res PrivValidatorSocketMsg
|
}
|
||||||
switch r := req.(type) {
|
|
||||||
case PubKeyMsg:
|
|
||||||
res = PubKeyMsg{pvss.privVal.PubKey()}
|
|
||||||
case SignVoteMsg:
|
|
||||||
pvss.privVal.SignVote(pvss.chainID, r.Vote)
|
|
||||||
res = SignVoteMsg{r.Vote}
|
|
||||||
case SignProposalMsg:
|
|
||||||
pvss.privVal.SignProposal(pvss.chainID, r.Proposal)
|
|
||||||
res = SignProposalMsg{r.Proposal}
|
|
||||||
case SignHeartbeatMsg:
|
|
||||||
pvss.privVal.SignHeartbeat(pvss.chainID, r.Heartbeat)
|
|
||||||
res = SignHeartbeatMsg{r.Heartbeat}
|
|
||||||
default:
|
|
||||||
panic(fmt.Sprintf("unknown msg: %v", r))
|
|
||||||
}
|
|
||||||
|
|
||||||
b = wire.BinaryBytes(res)
|
var res PrivValidatorSocketMsg
|
||||||
_, err = conn.Write(b)
|
|
||||||
if err != nil {
|
switch r := req.(type) {
|
||||||
panic(err)
|
case *PubKeyMsg:
|
||||||
}
|
res = &PubKeyMsg{pvss.privVal.PubKey()}
|
||||||
|
case *SignVoteMsg:
|
||||||
|
pvss.privVal.SignVote(pvss.chainID, r.Vote)
|
||||||
|
res = &SignVoteMsg{r.Vote}
|
||||||
|
case *SignProposalMsg:
|
||||||
|
pvss.privVal.SignProposal(pvss.chainID, r.Proposal)
|
||||||
|
res = &SignProposalMsg{r.Proposal}
|
||||||
|
case *SignHeartbeatMsg:
|
||||||
|
pvss.privVal.SignHeartbeat(pvss.chainID, r.Heartbeat)
|
||||||
|
res = &SignHeartbeatMsg{r.Heartbeat}
|
||||||
|
default:
|
||||||
|
panic(fmt.Sprintf("unknown msg: %v", r))
|
||||||
|
}
|
||||||
|
|
||||||
|
b := wire.BinaryBytes(res)
|
||||||
|
_, err = conn.Write(b)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -254,23 +260,47 @@ var _ = wire.RegisterInterface(
|
|||||||
wire.ConcreteType{&SignHeartbeatMsg{}, msgTypeSignHeartbeat},
|
wire.ConcreteType{&SignHeartbeatMsg{}, msgTypeSignHeartbeat},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func readMsg(r io.Reader) (PrivValidatorSocketMsg, error) {
|
||||||
|
var (
|
||||||
|
n int
|
||||||
|
err error
|
||||||
|
)
|
||||||
|
|
||||||
|
read := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, &n, &err)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
w, ok := read.(struct{ PrivValidatorSocketMsg })
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("unknown type")
|
||||||
|
}
|
||||||
|
|
||||||
|
return w.PrivValidatorSocketMsg, nil
|
||||||
|
}
|
||||||
|
|
||||||
func readWrite(conn net.Conn, req PrivValidatorSocketMsg) (res PrivValidatorSocketMsg, err error) {
|
func readWrite(conn net.Conn, req PrivValidatorSocketMsg) (res PrivValidatorSocketMsg, err error) {
|
||||||
b := wire.BinaryBytes(req)
|
b := wire.BinaryBytes(req)
|
||||||
|
|
||||||
_, err = conn.Write(b)
|
_, err = conn.Write(b)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var n int
|
return readMsg(conn)
|
||||||
b = wire.ReadByteSlice(conn, 0, &n, &err) //XXX: no max
|
|
||||||
return decodeMsg(b)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func decodeMsg(bz []byte) (msg PrivValidatorSocketMsg, err error) {
|
func decodeMsg(bz []byte) (msg PrivValidatorSocketMsg, err error) {
|
||||||
n := new(int)
|
var (
|
||||||
r := bytes.NewReader(bz)
|
r = bytes.NewReader(bz)
|
||||||
msgI := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, n, &err)
|
|
||||||
|
n int
|
||||||
|
)
|
||||||
|
|
||||||
|
msgI := wire.ReadBinary(struct{ PrivValidatorSocketMsg }{}, r, 0, &n, &err)
|
||||||
|
|
||||||
msg = msgI.(struct{ PrivValidatorSocketMsg }).PrivValidatorSocketMsg
|
msg = msgI.(struct{ PrivValidatorSocketMsg }).PrivValidatorSocketMsg
|
||||||
|
|
||||||
return msg, err
|
return msg, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
44
types/priv_validator/socket_test.go
Normal file
44
types/priv_validator/socket_test.go
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
package types
|
||||||
|
|
||||||
|
import (
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
crypto "github.com/tendermint/go-crypto"
|
||||||
|
"github.com/tendermint/tendermint/types"
|
||||||
|
"github.com/tendermint/tmlibs/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPrivValidatorSocketServer(t *testing.T) {
|
||||||
|
var (
|
||||||
|
chainID = "test-chain"
|
||||||
|
logger = log.TestingLogger()
|
||||||
|
signer = types.GenSigner()
|
||||||
|
privKey = crypto.GenPrivKeyEd25519()
|
||||||
|
privVal = NewTestPrivValidator(signer)
|
||||||
|
pvss = NewPrivValidatorSocketServer(
|
||||||
|
logger,
|
||||||
|
"127.0.0.1:0",
|
||||||
|
chainID,
|
||||||
|
privVal,
|
||||||
|
privKey,
|
||||||
|
1,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
err := pvss.Start()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
c := NewPrivValidatorSocketClient(logger, pvss.listener.Addr().String())
|
||||||
|
|
||||||
|
err = c.Start()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if have, want := c.PubKey(), pvss.privVal.PubKey(); !reflect.DeepEqual(have, want) {
|
||||||
|
t.Errorf("have %v, want %v", have, want)
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user