tendermint/privval/socket.go
Alexander Simmerl bf370d36c2
Extract priv_validator into first class package
This is a maintenance change to move the private validator package out
of the types and to a top-level location. There is no good reason to
keep it under the types and it will more clearly coommunicate where
additions related to the privval belong. It leaves the interface and the
mock in types for now as it would introduce circular dependency between
privval and types, this should be resolved eventually.

* mv priv_validator to privval pkg
* use consistent `privval` as import

Follow-up to #1255
2018-06-03 13:51:58 +02:00

539 lines
12 KiB
Go

package privval
import (
"errors"
"fmt"
"io"
"net"
"time"
"github.com/tendermint/go-amino"
"github.com/tendermint/go-crypto"
cmn "github.com/tendermint/tmlibs/common"
"github.com/tendermint/tmlibs/log"
p2pconn "github.com/tendermint/tendermint/p2p/conn"
"github.com/tendermint/tendermint/types"
)
const (
defaultAcceptDeadlineSeconds = 3
defaultConnDeadlineSeconds = 3
defaultConnHeartBeatSeconds = 30
defaultConnWaitSeconds = 60
defaultDialRetries = 10
)
// Socket errors.
var (
ErrDialRetryMax = errors.New("dialed maximum retries")
ErrConnWaitTimeout = errors.New("waited for remote signer for too long")
ErrConnTimeout = errors.New("remote signer timed out")
)
var (
acceptDeadline = time.Second + defaultAcceptDeadlineSeconds
connDeadline = time.Second * defaultConnDeadlineSeconds
connHeartbeat = time.Second * defaultConnHeartBeatSeconds
)
// SocketPVOption sets an optional parameter on the SocketPV.
type SocketPVOption func(*SocketPV)
// SocketPVAcceptDeadline sets the deadline for the SocketPV listener.
// A zero time value disables the deadline.
func SocketPVAcceptDeadline(deadline time.Duration) SocketPVOption {
return func(sc *SocketPV) { sc.acceptDeadline = deadline }
}
// SocketPVConnDeadline sets the read and write deadline for connections
// from external signing processes.
func SocketPVConnDeadline(deadline time.Duration) SocketPVOption {
return func(sc *SocketPV) { sc.connDeadline = deadline }
}
// SocketPVHeartbeat sets the period on which to check the liveness of the
// connected Signer connections.
func SocketPVHeartbeat(period time.Duration) SocketPVOption {
return func(sc *SocketPV) { sc.connHeartbeat = period }
}
// SocketPVConnWait sets the timeout duration before connection of external
// signing processes are considered to be unsuccessful.
func SocketPVConnWait(timeout time.Duration) SocketPVOption {
return func(sc *SocketPV) { sc.connWaitTimeout = timeout }
}
// SocketPV implements PrivValidator, it uses a socket to request signatures
// from an external process.
type SocketPV struct {
cmn.BaseService
addr string
acceptDeadline time.Duration
connDeadline time.Duration
connHeartbeat time.Duration
connWaitTimeout time.Duration
privKey crypto.PrivKeyEd25519
conn net.Conn
listener net.Listener
}
// Check that SocketPV implements PrivValidator.
var _ types.PrivValidator = (*SocketPV)(nil)
// NewSocketPV returns an instance of SocketPV.
func NewSocketPV(
logger log.Logger,
socketAddr string,
privKey crypto.PrivKeyEd25519,
) *SocketPV {
sc := &SocketPV{
addr: socketAddr,
acceptDeadline: acceptDeadline,
connDeadline: connDeadline,
connHeartbeat: connHeartbeat,
connWaitTimeout: time.Second * defaultConnWaitSeconds,
privKey: privKey,
}
sc.BaseService = *cmn.NewBaseService(logger, "SocketPV", sc)
return sc
}
// GetAddress implements PrivValidator.
func (sc *SocketPV) GetAddress() types.Address {
addr, err := sc.getAddress()
if err != nil {
panic(err)
}
return addr
}
// Address is an alias for PubKey().Address().
func (sc *SocketPV) getAddress() (cmn.HexBytes, error) {
p, err := sc.getPubKey()
if err != nil {
return nil, err
}
return p.Address(), nil
}
// GetPubKey implements PrivValidator.
func (sc *SocketPV) GetPubKey() crypto.PubKey {
pubKey, err := sc.getPubKey()
if err != nil {
panic(err)
}
return pubKey
}
func (sc *SocketPV) getPubKey() (crypto.PubKey, error) {
err := writeMsg(sc.conn, &PubKeyMsg{})
if err != nil {
return nil, err
}
res, err := readMsg(sc.conn)
if err != nil {
return nil, err
}
return res.(*PubKeyMsg).PubKey, nil
}
// SignVote implements PrivValidator.
func (sc *SocketPV) SignVote(chainID string, vote *types.Vote) error {
err := writeMsg(sc.conn, &SignVoteMsg{Vote: vote})
if err != nil {
return err
}
res, err := readMsg(sc.conn)
if err != nil {
return err
}
*vote = *res.(*SignVoteMsg).Vote
return nil
}
// SignProposal implements PrivValidator.
func (sc *SocketPV) SignProposal(
chainID string,
proposal *types.Proposal,
) error {
err := writeMsg(sc.conn, &SignProposalMsg{Proposal: proposal})
if err != nil {
return err
}
res, err := readMsg(sc.conn)
if err != nil {
return err
}
*proposal = *res.(*SignProposalMsg).Proposal
return nil
}
// SignHeartbeat implements PrivValidator.
func (sc *SocketPV) SignHeartbeat(
chainID string,
heartbeat *types.Heartbeat,
) error {
err := writeMsg(sc.conn, &SignHeartbeatMsg{Heartbeat: heartbeat})
if err != nil {
return err
}
res, err := readMsg(sc.conn)
if err != nil {
return err
}
*heartbeat = *res.(*SignHeartbeatMsg).Heartbeat
return nil
}
// OnStart implements cmn.Service.
func (sc *SocketPV) OnStart() error {
if err := sc.listen(); err != nil {
err = cmn.ErrorWrap(err, "failed to listen")
sc.Logger.Error(
"OnStart",
"err", err,
)
return err
}
conn, err := sc.waitConnection()
if err != nil {
err = cmn.ErrorWrap(err, "failed to accept connection")
sc.Logger.Error(
"OnStart",
"err", err,
)
return err
}
sc.conn = conn
return nil
}
// OnStop implements cmn.Service.
func (sc *SocketPV) OnStop() {
if sc.conn != nil {
if err := sc.conn.Close(); err != nil {
err = cmn.ErrorWrap(err, "failed to close connection")
sc.Logger.Error(
"OnStop",
"err", err,
)
}
}
if sc.listener != nil {
if err := sc.listener.Close(); err != nil {
err = cmn.ErrorWrap(err, "failed to close listener")
sc.Logger.Error(
"OnStop",
"err", err,
)
}
}
}
func (sc *SocketPV) acceptConnection() (net.Conn, error) {
conn, err := sc.listener.Accept()
if err != nil {
if !sc.IsRunning() {
return nil, nil // Ignore error from listener closing.
}
return nil, err
}
conn, err = p2pconn.MakeSecretConnection(conn, sc.privKey)
if err != nil {
return nil, err
}
return conn, nil
}
func (sc *SocketPV) listen() error {
ln, err := net.Listen(cmn.ProtocolAndAddress(sc.addr))
if err != nil {
return err
}
sc.listener = newTCPTimeoutListener(
ln,
sc.acceptDeadline,
sc.connDeadline,
sc.connHeartbeat,
)
return nil
}
// waitConnection uses the configured wait timeout to error if no external
// process connects in the time period.
func (sc *SocketPV) waitConnection() (net.Conn, error) {
var (
connc = make(chan net.Conn, 1)
errc = make(chan error, 1)
)
go func(connc chan<- net.Conn, errc chan<- error) {
conn, err := sc.acceptConnection()
if err != nil {
errc <- err
return
}
connc <- conn
}(connc, errc)
select {
case conn := <-connc:
return conn, nil
case err := <-errc:
if _, ok := err.(timeoutError); ok {
return nil, cmn.ErrorWrap(ErrConnWaitTimeout, err.Error())
}
return nil, err
case <-time.After(sc.connWaitTimeout):
return nil, ErrConnWaitTimeout
}
}
//---------------------------------------------------------
// RemoteSignerOption sets an optional parameter on the RemoteSigner.
type RemoteSignerOption func(*RemoteSigner)
// RemoteSignerConnDeadline sets the read and write deadline for connections
// from external signing processes.
func RemoteSignerConnDeadline(deadline time.Duration) RemoteSignerOption {
return func(ss *RemoteSigner) { ss.connDeadline = deadline }
}
// RemoteSignerConnRetries sets the amount of attempted retries to connect.
func RemoteSignerConnRetries(retries int) RemoteSignerOption {
return func(ss *RemoteSigner) { ss.connRetries = retries }
}
// RemoteSigner implements PrivValidator by dialing to a socket.
type RemoteSigner struct {
cmn.BaseService
addr string
chainID string
connDeadline time.Duration
connRetries int
privKey crypto.PrivKeyEd25519
privVal types.PrivValidator
conn net.Conn
}
// NewRemoteSigner returns an instance of RemoteSigner.
func NewRemoteSigner(
logger log.Logger,
chainID, socketAddr string,
privVal types.PrivValidator,
privKey crypto.PrivKeyEd25519,
) *RemoteSigner {
rs := &RemoteSigner{
addr: socketAddr,
chainID: chainID,
connDeadline: time.Second * defaultConnDeadlineSeconds,
connRetries: defaultDialRetries,
privKey: privKey,
privVal: privVal,
}
rs.BaseService = *cmn.NewBaseService(logger, "RemoteSigner", rs)
return rs
}
// OnStart implements cmn.Service.
func (rs *RemoteSigner) OnStart() error {
conn, err := rs.connect()
if err != nil {
err = cmn.ErrorWrap(err, "connect")
rs.Logger.Error("OnStart", "err", err)
return err
}
go rs.handleConnection(conn)
return nil
}
// OnStop implements cmn.Service.
func (rs *RemoteSigner) OnStop() {
if rs.conn == nil {
return
}
if err := rs.conn.Close(); err != nil {
rs.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed"))
}
}
func (rs *RemoteSigner) connect() (net.Conn, error) {
for retries := rs.connRetries; retries > 0; retries-- {
// Don't sleep if it is the first retry.
if retries != rs.connRetries {
time.Sleep(rs.connDeadline)
}
conn, err := cmn.Connect(rs.addr)
if err != nil {
err = cmn.ErrorWrap(err, "connection failed")
rs.Logger.Error(
"connect",
"addr", rs.addr,
"err", err,
)
continue
}
if err := conn.SetDeadline(time.Now().Add(connDeadline)); err != nil {
err = cmn.ErrorWrap(err, "setting connection timeout failed")
rs.Logger.Error(
"connect",
"err", err,
)
continue
}
conn, err = p2pconn.MakeSecretConnection(conn, rs.privKey)
if err != nil {
err = cmn.ErrorWrap(err, "encrypting connection failed")
rs.Logger.Error(
"connect",
"err", err,
)
continue
}
return conn, nil
}
return nil, ErrDialRetryMax
}
func (rs *RemoteSigner) handleConnection(conn net.Conn) {
for {
if !rs.IsRunning() {
return // Ignore error from listener closing.
}
req, err := readMsg(conn)
if err != nil {
if err != io.EOF {
rs.Logger.Error("handleConnection", "err", err)
}
return
}
var res SocketPVMsg
switch r := req.(type) {
case *PubKeyMsg:
var p crypto.PubKey
p = rs.privVal.GetPubKey()
res = &PubKeyMsg{p}
case *SignVoteMsg:
err = rs.privVal.SignVote(rs.chainID, r.Vote)
res = &SignVoteMsg{r.Vote}
case *SignProposalMsg:
err = rs.privVal.SignProposal(rs.chainID, r.Proposal)
res = &SignProposalMsg{r.Proposal}
case *SignHeartbeatMsg:
err = rs.privVal.SignHeartbeat(rs.chainID, r.Heartbeat)
res = &SignHeartbeatMsg{r.Heartbeat}
default:
err = fmt.Errorf("unknown msg: %v", r)
}
if err != nil {
rs.Logger.Error("handleConnection", "err", err)
return
}
err = writeMsg(conn, res)
if err != nil {
rs.Logger.Error("handleConnection", "err", err)
return
}
}
}
//---------------------------------------------------------
// SocketPVMsg is sent between RemoteSigner and SocketPV.
type SocketPVMsg interface{}
func RegisterSocketPVMsg(cdc *amino.Codec) {
cdc.RegisterInterface((*SocketPVMsg)(nil), nil)
cdc.RegisterConcrete(&PubKeyMsg{}, "tendermint/socketpv/PubKeyMsg", nil)
cdc.RegisterConcrete(&SignVoteMsg{}, "tendermint/socketpv/SignVoteMsg", nil)
cdc.RegisterConcrete(&SignProposalMsg{}, "tendermint/socketpv/SignProposalMsg", nil)
cdc.RegisterConcrete(&SignHeartbeatMsg{}, "tendermint/socketpv/SignHeartbeatMsg", nil)
}
// PubKeyMsg is a PrivValidatorSocket message containing the public key.
type PubKeyMsg struct {
PubKey crypto.PubKey
}
// SignVoteMsg is a PrivValidatorSocket message containing a vote.
type SignVoteMsg struct {
Vote *types.Vote
}
// SignProposalMsg is a PrivValidatorSocket message containing a Proposal.
type SignProposalMsg struct {
Proposal *types.Proposal
}
// SignHeartbeatMsg is a PrivValidatorSocket message containing a Heartbeat.
type SignHeartbeatMsg struct {
Heartbeat *types.Heartbeat
}
func readMsg(r io.Reader) (msg SocketPVMsg, err error) {
const maxSocketPVMsgSize = 1024 * 10
_, err = cdc.UnmarshalBinaryReader(r, &msg, maxSocketPVMsgSize)
if _, ok := err.(timeoutError); ok {
err = cmn.ErrorWrap(ErrConnTimeout, err.Error())
}
return
}
func writeMsg(w io.Writer, msg interface{}) (err error) {
_, err = cdc.MarshalBinaryWriter(w, msg)
if _, ok := err.(timeoutError); ok {
err = cmn.ErrorWrap(ErrConnTimeout, err.Error())
}
return
}