playing with show validator cmd and experimenting with spawning new

sessions / connections if the connection is lost
This commit is contained in:
Ismail Khoffi 2018-12-13 12:26:11 +01:00
parent 8a993c6486
commit 1ee32d3089
5 changed files with 105 additions and 69 deletions

View File

@ -2,12 +2,20 @@ package commands
import ( import (
"fmt" "fmt"
"os"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/tendermint/tendermint/crypto"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/node"
"github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/privval"
) )
func init() {
ShowValidatorCmd.Flags().String("priv_validator_laddr", config.PrivValidatorListenAddr, "Socket address to listen on for connections from external priv_validator process")
}
// ShowValidatorCmd adds capabilities for showing the validator info. // ShowValidatorCmd adds capabilities for showing the validator info.
var ShowValidatorCmd = &cobra.Command{ var ShowValidatorCmd = &cobra.Command{
Use: "show_validator", Use: "show_validator",
@ -16,10 +24,25 @@ var ShowValidatorCmd = &cobra.Command{
} }
func showValidator(cmd *cobra.Command, args []string) { func showValidator(cmd *cobra.Command, args []string) {
// TODO(ismail): add a flag and check if we actually want to see the pub key var pubKey crypto.PubKey
// of the remote signer instead of the FilePV if config.PrivValidatorListenAddr != "" {
// If an address is provided, listen on the socket for a connection from an
// external signing process and request the public key from there.
privValidator, err := node.CreateAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to get private validator's public key: %v", err)
os.Exit(-1)
}
if pvsc, ok := privValidator.(cmn.Service); ok {
if err := pvsc.Stop(); err != nil {
fmt.Fprintf(os.Stderr, "Failed to get stop private validator client: %v", err)
}
}
pubKey = privValidator.GetPubKey()
} else {
privValidator := privval.LoadOrGenFilePV(config.PrivValidatorFile()) privValidator := privval.LoadOrGenFilePV(config.PrivValidatorFile())
key := privValidator.GetPubKey() pubKey = privValidator.GetPubKey()
pubKeyJSONBytes, _ := cdc.MarshalJSON(key) }
pubKeyJSONBytes, _ := cdc.MarshalJSON(pubKey)
fmt.Println(string(pubKeyJSONBytes)) fmt.Println(string(pubKeyJSONBytes))
} }

View File

@ -229,7 +229,7 @@ func NewNode(config *cfg.Config,
// If an address is provided, listen on the socket for a connection from an // If an address is provided, listen on the socket for a connection from an
// external signing process. // external signing process.
// FIXME: we should start services inside OnStart // FIXME: we should start services inside OnStart
privValidator, err = createAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger) privValidator, err = CreateAndStartPrivValidatorSocketClient(config.PrivValidatorListenAddr, logger)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "Error with private validator socket client") return nil, errors.Wrap(err, "Error with private validator socket client")
} }
@ -853,7 +853,7 @@ func saveGenesisDoc(db dbm.DB, genDoc *types.GenesisDoc) {
db.SetSync(genesisDocKey, bytes) db.SetSync(genesisDocKey, bytes)
} }
func createAndStartPrivValidatorSocketClient( func CreateAndStartPrivValidatorSocketClient(
listenAddr string, listenAddr string,
logger log.Logger, logger log.Logger,
) (types.PrivValidator, error) { ) (types.PrivValidator, error) {

View File

@ -230,6 +230,7 @@ func writeMsg(w io.Writer, msg interface{}) (err error) {
} }
func handleRequest(req RemoteSignerMsg, chainID string, privVal types.PrivValidator) (RemoteSignerMsg, error) { func handleRequest(req RemoteSignerMsg, chainID string, privVal types.PrivValidator) (RemoteSignerMsg, error) {
fmt.Println("handle req", req)
var res RemoteSignerMsg var res RemoteSignerMsg
var err error var err error

View File

@ -115,10 +115,13 @@ func (sc *TCPVal) OnStart() error {
// Start a routine to keep the connection alive // Start a routine to keep the connection alive
sc.cancelPing = make(chan struct{}, 1) sc.cancelPing = make(chan struct{}, 1)
sc.pingTicker = time.NewTicker(sc.connHeartbeat) sc.pingTicker = time.NewTicker(sc.connHeartbeat)
go func() { /*go func() {
for { for {
select { select {
case <-sc.pingTicker.C: case <-sc.pingTicker.C:
sc.Logger.Error(
"Pinging",
)
err := sc.Ping() err := sc.Ping()
if err != nil { if err != nil {
sc.Logger.Error( sc.Logger.Error(
@ -127,11 +130,14 @@ func (sc *TCPVal) OnStart() error {
) )
} }
case <-sc.cancelPing: case <-sc.cancelPing:
sc.Logger.Error(
"Pinging Stopped",
)
sc.pingTicker.Stop() sc.pingTicker.Stop()
return return
} }
} }
}() }()*/
return nil return nil
} }

View File

@ -1,6 +1,7 @@
package privval package privval
import ( import (
"fmt"
"io" "io"
"net" "net"
"time" "time"
@ -63,13 +64,7 @@ func NewRemoteSigner(
// OnStart implements cmn.Service. // OnStart implements cmn.Service.
func (rs *RemoteSigner) OnStart() error { func (rs *RemoteSigner) OnStart() error {
conn, err := rs.connect() go rs.handleConnection()
if err != nil {
rs.Logger.Error("OnStart", "err", err)
return err
}
go rs.handleConnection(conn)
return nil return nil
} }
@ -86,11 +81,11 @@ func (rs *RemoteSigner) OnStop() {
} }
func (rs *RemoteSigner) connect() (net.Conn, error) { func (rs *RemoteSigner) connect() (net.Conn, error) {
for retries := rs.connRetries; retries > 0; retries-- { //for retries := rs.connRetries; retries > 0; retries-- {
// Don't sleep if it is the first retry. // Don't sleep if it is the first retry.
if retries != rs.connRetries { //if retries != rs.connRetries {
time.Sleep(rs.connDeadline) // time.Sleep(rs.connDeadline)
} //}
conn, err := cmn.Connect(rs.addr) conn, err := cmn.Connect(rs.addr)
if err != nil { if err != nil {
@ -100,16 +95,17 @@ func (rs *RemoteSigner) connect() (net.Conn, error) {
"err", err, "err", err,
) )
continue return nil, err
} }
if err := conn.SetDeadline(time.Now().Add(connTimeout)); err != nil { //if err := conn.SetDeadline(time.Now().Add(connTimeout)); err != nil {
rs.Logger.Error( // rs.Logger.Error(
"connect", // "connect",
"err", err, // "err", err,
) // )
continue // //continue
} // return nil, err
//}
conn, err = p2pconn.MakeSecretConnection(conn, rs.privKey) conn, err = p2pconn.MakeSecretConnection(conn, rs.privKey)
if err != nil { if err != nil {
@ -118,43 +114,53 @@ func (rs *RemoteSigner) connect() (net.Conn, error) {
"err", err, "err", err,
) )
continue return nil, err
} }
fmt.Println("connected", conn.RemoteAddr())
return conn, nil return conn, nil
} //}
return nil, ErrDialRetryMax return nil, ErrDialRetryMax
} }
func (rs *RemoteSigner) handleConnection(conn net.Conn) { func (rs *RemoteSigner) handleConnection() {
for { for { // establish connection loop:
if !rs.IsRunning() { if !rs.IsRunning() {
return // Ignore error from listener closing. return // Ignore error from listener closing.
} }
fmt.Println("Connecting again ...")
conn, err := rs.connect()
if err != nil {
rs.Logger.Error("OnStart", "err", err)
fmt.Println("failed to connect", err)
time.Sleep(rs.connDeadline)
continue
}
// Reset the connection deadline // Reset the connection deadline
conn.SetDeadline(time.Now().Add(rs.connDeadline)) //conn.SetDeadline(time.Now().Add(rs.connDeadline*10))
for { // handle request loop
req, err := readMsg(conn) req, err := readMsg(conn)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
rs.Logger.Error("handleConnection", "err", err) rs.Logger.Error("handleConnection readMsg", "err", err)
} }
return conn.Close()
break
} }
res, err := handleRequest(req, rs.chainID, rs.privVal) res, err := handleRequest(req, rs.chainID, rs.privVal)
if err != nil { if err != nil {
// only log the error; we'll reply with an error in res // only log the error; we'll reply with an error in res
rs.Logger.Error("handleConnection", "err", err) rs.Logger.Error("handleConnection handleRequest", "err", err)
} }
err = writeMsg(conn, res) err = writeMsg(conn, res)
if err != nil { if err != nil {
rs.Logger.Error("handleConnection", "err", err) rs.Logger.Error("handleConnection writeMsg", "err", err)
return break
}
} }
} }
} }