Close and retry a RemoteSigner on err (#2923)

* Close and recreate a RemoteSigner on err

* Update changelog

* Address Anton's comments / suggestions:

 - update changelog
 - restart TCPVal
 - shut down on `ErrUnexpectedResponse`

* re-init remote signer client with fresh connection if Ping fails

- add/update TODOs in secret connection
- rename tcp.go -> tcp_client.go, same with ipc to clarify their purpose

* account for `conn returned by waitConnection can be `nil`

- also add TODO about RemoteSigner conn field

* Tests for retrying: IPC / TCP

 - shorter info log on success
 - set conn and use it in tests to close conn

* Tests for retrying: IPC / TCP

 - shorter info log on success
 - set conn and use it in tests to close conn
 - add rwmutex for conn field in IPC

* comments and doc.go

* fix ipc tests. fixes #2677

* use constants for tests

* cleanup some error statements

* fixes #2784, race in tests

* remove print statement

* minor fixes from review

* update comment on sts spec

* cosmetics

* p2p/conn: add failing tests

* p2p/conn: make SecretConnection thread safe

* changelog

* IPCVal signer refactor

- use a .reset() method
- don't use embedded RemoteSignerClient
- guard RemoteSignerClient with mutex
- drop the .conn
- expose Close() on RemoteSignerClient

* apply IPCVal refactor to TCPVal

* remove mtx from RemoteSignerClient

* consolidate IPCVal and TCPVal, fixes #3104

- done in tcp_client.go
- now called SocketVal
- takes a listener in the constructor
- make tcpListener and unixListener contain all the differences

* delete ipc files

* introduce unix and tcp dialer for RemoteSigner

* rename files

- drop tcp_ prefix
- rename priv_validator.go to file.go

* bring back listener options

* fix node

* fix priv_val_server

* fix node test

* minor cleanup and comments
This commit is contained in:
Ismail Khoffi
2019-01-13 20:31:31 +01:00
committed by Ethan Buchman
parent ef94a322b8
commit a6011c007d
20 changed files with 710 additions and 871 deletions

View File

@ -10,8 +10,8 @@ Special thanks to external contributors on this release:
- [cli] Removed `node` `--proxy_app=dummy` option. Use `kvstore` (`persistent_kvstore`) instead. - [cli] Removed `node` `--proxy_app=dummy` option. Use `kvstore` (`persistent_kvstore`) instead.
- [cli] Renamed `node` `--proxy_app=nilapp` to `--proxy_app=noop`. - [cli] Renamed `node` `--proxy_app=nilapp` to `--proxy_app=noop`.
- [config] \#2992 `allow_duplicate_ip` is now set to false - [config] \#2992 `allow_duplicate_ip` is now set to false
- [privval] \#2926 split up `PubKeyMsg` into `PubKeyRequest` and `PubKeyResponse` to be consistent with other message types - [privval] \#2926 split up `PubKeyMsg` into `PubKeyRequest` and `PubKeyResponse` to be consistent with other message types
- [privval] \#2923 listen for unix socket connections instead of dialing them
* Apps * Apps
@ -23,13 +23,13 @@ Special thanks to external contributors on this release:
* Blockchain Protocol * Blockchain Protocol
* P2P Protocol * P2P Protocol
- multiple connections from the same IP are now disabled by default (see `allow_duplicate_ip` config option)
### FEATURES: ### FEATURES:
- [privval] \#1181 Split immutable and mutable parts of priv_validator.json - [privval] \#1181 Split immutable and mutable parts of `priv_validator.json`
### IMPROVEMENTS: ### IMPROVEMENTS:
- [p2p/conn] \#3111 make SecretConnection thread safe - [p2p/conn] \#3111 make SecretConnection thread safe
- [privval] \#2923 retry RemoteSigner connections on error
- [rpc] \#3047 Include peer's remote IP in `/net_info` - [rpc] \#3047 Include peer's remote IP in `/net_info`
### BUG FIXES: ### BUG FIXES:

View File

@ -3,6 +3,7 @@ package main
import ( import (
"flag" "flag"
"os" "os"
"time"
"github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
@ -34,13 +35,20 @@ func main() {
pv := privval.LoadFilePV(*privValKeyPath, *privValStatePath) pv := privval.LoadFilePV(*privValKeyPath, *privValStatePath)
rs := privval.NewRemoteSigner( var dialer privval.Dialer
logger, protocol, address := cmn.ProtocolAndAddress(*addr)
*chainID, switch protocol {
*addr, case "unix":
pv, dialer = privval.DialUnixFn(address)
ed25519.GenPrivKey(), case "tcp":
) connTimeout := 3 * time.Second // TODO
dialer = privval.DialTCPFn(address, connTimeout, ed25519.GenPrivKey())
default:
logger.Error("Unknown protocol", "protocol", protocol)
return
}
rs := privval.NewRemoteSigner(logger, *chainID, pv, dialer)
err := rs.Start() err := rs.Start()
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -878,16 +878,20 @@ func createAndStartPrivValidatorSocketClient(
listenAddr string, listenAddr string,
logger log.Logger, logger log.Logger,
) (types.PrivValidator, error) { ) (types.PrivValidator, error) {
var pvsc types.PrivValidator var listener net.Listener
protocol, address := cmn.ProtocolAndAddress(listenAddr) protocol, address := cmn.ProtocolAndAddress(listenAddr)
ln, err := net.Listen(protocol, address)
if err != nil {
return nil, err
}
switch protocol { switch protocol {
case "unix": case "unix":
pvsc = privval.NewIPCVal(logger.With("module", "privval"), address) listener = privval.NewUnixListener(ln)
case "tcp": case "tcp":
// TODO: persist this key so external signer // TODO: persist this key so external signer
// can actually authenticate us // can actually authenticate us
pvsc = privval.NewTCPVal(logger.With("module", "privval"), listenAddr, ed25519.GenPrivKey()) listener = privval.NewTCPListener(ln, ed25519.GenPrivKey())
default: default:
return nil, fmt.Errorf( return nil, fmt.Errorf(
"Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s", "Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
@ -895,11 +899,10 @@ func createAndStartPrivValidatorSocketClient(
) )
} }
if pvsc, ok := pvsc.(cmn.Service); ok { pvsc := privval.NewSocketVal(logger.With("module", "privval"), listener)
if err := pvsc.Start(); err != nil { if err := pvsc.Start(); err != nil {
return nil, errors.Wrap(err, "failed to start") return nil, errors.Wrap(err, "failed to start")
} }
}
return pvsc, nil return pvsc, nil
} }

View File

@ -122,25 +122,25 @@ func TestNodeSetPrivValTCP(t *testing.T) {
config := cfg.ResetTestRoot("node_priv_val_tcp_test") config := cfg.ResetTestRoot("node_priv_val_tcp_test")
config.BaseConfig.PrivValidatorListenAddr = addr config.BaseConfig.PrivValidatorListenAddr = addr
rs := privval.NewRemoteSigner( dialer := privval.DialTCPFn(addr, 100*time.Millisecond, ed25519.GenPrivKey())
pvsc := privval.NewRemoteSigner(
log.TestingLogger(), log.TestingLogger(),
config.ChainID(), config.ChainID(),
addr,
types.NewMockPV(), types.NewMockPV(),
ed25519.GenPrivKey(), dialer,
) )
privval.RemoteSignerConnDeadline(5 * time.Millisecond)(rs)
go func() { go func() {
err := rs.Start() err := pvsc.Start()
if err != nil { if err != nil {
panic(err) panic(err)
} }
}() }()
defer rs.Stop() defer pvsc.Stop()
n, err := DefaultNewNode(config, log.TestingLogger()) n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err) require.NoError(t, err)
assert.IsType(t, &privval.TCPVal{}, n.PrivValidator()) assert.IsType(t, &privval.SocketVal{}, n.PrivValidator())
} }
// address without a protocol must result in error // address without a protocol must result in error
@ -161,25 +161,25 @@ func TestNodeSetPrivValIPC(t *testing.T) {
config := cfg.ResetTestRoot("node_priv_val_tcp_test") config := cfg.ResetTestRoot("node_priv_val_tcp_test")
config.BaseConfig.PrivValidatorListenAddr = "unix://" + tmpfile config.BaseConfig.PrivValidatorListenAddr = "unix://" + tmpfile
rs := privval.NewIPCRemoteSigner( dialer := privval.DialUnixFn(tmpfile)
pvsc := privval.NewRemoteSigner(
log.TestingLogger(), log.TestingLogger(),
config.ChainID(), config.ChainID(),
tmpfile,
types.NewMockPV(), types.NewMockPV(),
dialer,
) )
privval.IPCRemoteSignerConnDeadline(3 * time.Second)(rs)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
defer close(done) defer close(done)
n, err := DefaultNewNode(config, log.TestingLogger()) n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err) require.NoError(t, err)
assert.IsType(t, &privval.IPCVal{}, n.PrivValidator()) assert.IsType(t, &privval.SocketVal{}, n.PrivValidator())
}() }()
err := rs.Start() err := pvsc.Start()
require.NoError(t, err) require.NoError(t, err)
defer rs.Stop() defer pvsc.Stop()
<-done <-done
} }

263
privval/client.go Normal file
View File

@ -0,0 +1,263 @@
package privval
import (
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/tendermint/tendermint/crypto"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
const (
defaultConnHeartBeatSeconds = 2
defaultDialRetries = 10
)
// Socket errors.
var (
ErrUnexpectedResponse = errors.New("received unexpected response")
)
var (
connHeartbeat = time.Second * defaultConnHeartBeatSeconds
)
// SocketValOption sets an optional parameter on the SocketVal.
type SocketValOption func(*SocketVal)
// SocketValHeartbeat sets the period on which to check the liveness of the
// connected Signer connections.
func SocketValHeartbeat(period time.Duration) SocketValOption {
return func(sc *SocketVal) { sc.connHeartbeat = period }
}
// SocketVal implements PrivValidator.
// It listens for an external process to dial in and uses
// the socket to request signatures.
type SocketVal struct {
cmn.BaseService
listener net.Listener
// ping
cancelPing chan struct{}
pingTicker *time.Ticker
connHeartbeat time.Duration
// signer is mutable since it can be
// reset if the connection fails.
// failures are detected by a background
// ping routine.
// Methods on the underlying net.Conn itself
// are already gorountine safe.
mtx sync.RWMutex
signer *RemoteSignerClient
}
// Check that SocketVal implements PrivValidator.
var _ types.PrivValidator = (*SocketVal)(nil)
// NewSocketVal returns an instance of SocketVal.
func NewSocketVal(
logger log.Logger,
listener net.Listener,
) *SocketVal {
sc := &SocketVal{
listener: listener,
connHeartbeat: connHeartbeat,
}
sc.BaseService = *cmn.NewBaseService(logger, "SocketVal", sc)
return sc
}
//--------------------------------------------------------
// Implement PrivValidator
// GetPubKey implements PrivValidator.
func (sc *SocketVal) GetPubKey() crypto.PubKey {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
return sc.signer.GetPubKey()
}
// SignVote implements PrivValidator.
func (sc *SocketVal) SignVote(chainID string, vote *types.Vote) error {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
return sc.signer.SignVote(chainID, vote)
}
// SignProposal implements PrivValidator.
func (sc *SocketVal) SignProposal(chainID string, proposal *types.Proposal) error {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
return sc.signer.SignProposal(chainID, proposal)
}
//--------------------------------------------------------
// More thread safe methods proxied to the signer
// Ping is used to check connection health.
func (sc *SocketVal) Ping() error {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
return sc.signer.Ping()
}
// Close closes the underlying net.Conn.
func (sc *SocketVal) Close() {
sc.mtx.RLock()
defer sc.mtx.RUnlock()
if sc.signer != nil {
if err := sc.signer.Close(); err != nil {
sc.Logger.Error("OnStop", "err", err)
}
}
if sc.listener != nil {
if err := sc.listener.Close(); err != nil {
sc.Logger.Error("OnStop", "err", err)
}
}
}
//--------------------------------------------------------
// Service start and stop
// OnStart implements cmn.Service.
func (sc *SocketVal) OnStart() error {
if closed, err := sc.reset(); err != nil {
sc.Logger.Error("OnStart", "err", err)
return err
} else if closed {
return fmt.Errorf("listener is closed")
}
// Start a routine to keep the connection alive
sc.cancelPing = make(chan struct{}, 1)
sc.pingTicker = time.NewTicker(sc.connHeartbeat)
go func() {
for {
select {
case <-sc.pingTicker.C:
err := sc.Ping()
if err != nil {
sc.Logger.Error("Ping", "err", err)
if err == ErrUnexpectedResponse {
return
}
closed, err := sc.reset()
if err != nil {
sc.Logger.Error("Reconnecting to remote signer failed", "err", err)
continue
}
if closed {
sc.Logger.Info("listener is closing")
return
}
sc.Logger.Info("Re-created connection to remote signer", "impl", sc)
}
case <-sc.cancelPing:
sc.pingTicker.Stop()
return
}
}
}()
return nil
}
// OnStop implements cmn.Service.
func (sc *SocketVal) OnStop() {
if sc.cancelPing != nil {
close(sc.cancelPing)
}
sc.Close()
}
//--------------------------------------------------------
// Connection and signer management
// waits to accept and sets a new connection.
// connection is closed in OnStop.
// returns true if the listener is closed
// (ie. it returns a nil conn).
func (sc *SocketVal) reset() (bool, error) {
sc.mtx.Lock()
defer sc.mtx.Unlock()
// first check if the conn already exists and close it.
if sc.signer != nil {
if err := sc.signer.Close(); err != nil {
sc.Logger.Error("error closing connection", "err", err)
}
}
// wait for a new conn
conn, err := sc.waitConnection()
if err != nil {
return false, err
}
// listener is closed
if conn == nil {
return true, nil
}
sc.signer, err = NewRemoteSignerClient(conn)
if err != nil {
// failed to fetch the pubkey. close out the connection.
if err := conn.Close(); err != nil {
sc.Logger.Error("error closing connection", "err", err)
}
return false, err
}
return false, nil
}
func (sc *SocketVal) acceptConnection() (net.Conn, error) {
conn, err := sc.listener.Accept()
if err != nil {
if !sc.IsRunning() {
return nil, nil // Ignore error from listener closing.
}
return nil, err
}
return conn, nil
}
// waitConnection uses the configured wait timeout to error if no external
// process connects in the time period.
func (sc *SocketVal) waitConnection() (net.Conn, error) {
var (
connc = make(chan net.Conn, 1)
errc = make(chan error, 1)
)
go func(connc chan<- net.Conn, errc chan<- error) {
conn, err := sc.acceptConnection()
if err != nil {
errc <- err
return
}
connc <- conn
}(connc, errc)
select {
case conn := <-connc:
return conn, nil
case err := <-errc:
return nil, err
}
}

View File

@ -17,6 +17,16 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
var (
testAcceptDeadline = defaultAcceptDeadlineSeconds * time.Second
testConnDeadline = 100 * time.Millisecond
testConnDeadline2o3 = 66 * time.Millisecond // 2/3 of the other one
testHeartbeatTimeout = 10 * time.Millisecond
testHeartbeatTimeout3o2 = 6 * time.Millisecond // 3/2 of the other one
)
func TestSocketPVAddress(t *testing.T) { func TestSocketPVAddress(t *testing.T) {
var ( var (
chainID = cmn.RandStr(12) chainID = cmn.RandStr(12)
@ -39,8 +49,7 @@ func TestSocketPVPubKey(t *testing.T) {
defer sc.Stop() defer sc.Stop()
defer rs.Stop() defer rs.Stop()
clientKey, err := sc.getPubKey() clientKey := sc.GetPubKey()
require.NoError(t, err)
privvalPubKey := rs.privVal.GetPubKey() privvalPubKey := rs.privVal.GetPubKey()
@ -95,14 +104,14 @@ func TestSocketPVVoteResetDeadline(t *testing.T) {
defer sc.Stop() defer sc.Stop()
defer rs.Stop() defer rs.Stop()
time.Sleep(3 * time.Millisecond) time.Sleep(testConnDeadline2o3)
require.NoError(t, rs.privVal.SignVote(chainID, want)) require.NoError(t, rs.privVal.SignVote(chainID, want))
require.NoError(t, sc.SignVote(chainID, have)) require.NoError(t, sc.SignVote(chainID, have))
assert.Equal(t, want.Signature, have.Signature) assert.Equal(t, want.Signature, have.Signature)
// This would exceed the deadline if it was not extended by the previous message // This would exceed the deadline if it was not extended by the previous message
time.Sleep(3 * time.Millisecond) time.Sleep(testConnDeadline2o3)
require.NoError(t, rs.privVal.SignVote(chainID, want)) require.NoError(t, rs.privVal.SignVote(chainID, want))
require.NoError(t, sc.SignVote(chainID, have)) require.NoError(t, sc.SignVote(chainID, have))
@ -122,7 +131,7 @@ func TestSocketPVVoteKeepalive(t *testing.T) {
defer sc.Stop() defer sc.Stop()
defer rs.Stop() defer rs.Stop()
time.Sleep(10 * time.Millisecond) time.Sleep(testConnDeadline * 2)
require.NoError(t, rs.privVal.SignVote(chainID, want)) require.NoError(t, rs.privVal.SignVote(chainID, want))
require.NoError(t, sc.SignVote(chainID, have)) require.NoError(t, sc.SignVote(chainID, have))
@ -133,16 +142,11 @@ func TestSocketPVDeadline(t *testing.T) {
var ( var (
addr = testFreeAddr(t) addr = testFreeAddr(t)
listenc = make(chan struct{}) listenc = make(chan struct{})
sc = NewTCPVal( thisConnTimeout = 100 * time.Millisecond
log.TestingLogger(), sc = newSocketVal(log.TestingLogger(), addr, thisConnTimeout)
addr,
ed25519.GenPrivKey(),
)
) )
TCPValConnTimeout(100 * time.Millisecond)(sc) go func(sc *SocketVal) {
go func(sc *TCPVal) {
defer close(listenc) defer close(listenc)
assert.Equal(t, sc.Start().(cmn.Error).Data(), ErrConnTimeout) assert.Equal(t, sc.Start().(cmn.Error).Data(), ErrConnTimeout)
@ -199,9 +203,8 @@ func TestRemoteSignerRetry(t *testing.T) {
rs := NewRemoteSigner( rs := NewRemoteSigner(
log.TestingLogger(), log.TestingLogger(),
cmn.RandStr(12), cmn.RandStr(12),
ln.Addr().String(),
types.NewMockPV(), types.NewMockPV(),
ed25519.GenPrivKey(), DialTCPFn(ln.Addr().String(), testConnDeadline, ed25519.GenPrivKey()),
) )
defer rs.Stop() defer rs.Stop()
@ -230,15 +233,8 @@ func TestRemoteSignVoteErrors(t *testing.T) {
defer sc.Stop() defer sc.Stop()
defer rs.Stop() defer rs.Stop()
err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote}) err := sc.SignVote("", vote)
require.NoError(t, err) require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
res, err := readMsg(sc.conn)
require.NoError(t, err)
resp := *res.(*SignedVoteResponse)
require.NotNil(t, resp.Error)
require.Equal(t, resp.Error.Description, types.ErroringMockPVErr.Error())
err = rs.privVal.SignVote(chainID, vote) err = rs.privVal.SignVote(chainID, vote)
require.Error(t, err) require.Error(t, err)
@ -257,15 +253,8 @@ func TestRemoteSignProposalErrors(t *testing.T) {
defer sc.Stop() defer sc.Stop()
defer rs.Stop() defer rs.Stop()
err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal}) err := sc.SignProposal("", proposal)
require.NoError(t, err) require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
res, err := readMsg(sc.conn)
require.NoError(t, err)
resp := *res.(*SignedProposalResponse)
require.NotNil(t, resp.Error)
require.Equal(t, resp.Error.Description, types.ErroringMockPVErr.Error())
err = rs.privVal.SignProposal(chainID, proposal) err = rs.privVal.SignProposal(chainID, proposal)
require.Error(t, err) require.Error(t, err)
@ -285,15 +274,10 @@ func TestErrUnexpectedResponse(t *testing.T) {
rs = NewRemoteSigner( rs = NewRemoteSigner(
logger, logger,
chainID, chainID,
addr,
types.NewMockPV(), types.NewMockPV(),
ed25519.GenPrivKey(), DialTCPFn(addr, testConnDeadline, ed25519.GenPrivKey()),
)
sc = NewTCPVal(
logger,
addr,
ed25519.GenPrivKey(),
) )
sc = newSocketVal(logger, addr, testConnDeadline)
) )
testStartSocketPV(t, readyc, sc) testStartSocketPV(t, readyc, sc)
@ -331,11 +315,73 @@ func TestErrUnexpectedResponse(t *testing.T) {
require.Equal(t, err, ErrUnexpectedResponse) require.Equal(t, err, ErrUnexpectedResponse)
} }
func TestRetryTCPConnToRemoteSigner(t *testing.T) {
var (
addr = testFreeAddr(t)
logger = log.TestingLogger()
chainID = cmn.RandStr(12)
readyc = make(chan struct{})
rs = NewRemoteSigner(
logger,
chainID,
types.NewMockPV(),
DialTCPFn(addr, testConnDeadline, ed25519.GenPrivKey()),
)
thisConnTimeout = testConnDeadline
sc = newSocketVal(logger, addr, thisConnTimeout)
)
// Ping every:
SocketValHeartbeat(testHeartbeatTimeout)(sc)
RemoteSignerConnDeadline(testConnDeadline)(rs)
RemoteSignerConnRetries(10)(rs)
testStartSocketPV(t, readyc, sc)
defer sc.Stop()
require.NoError(t, rs.Start())
assert.True(t, rs.IsRunning())
<-readyc
time.Sleep(testHeartbeatTimeout * 2)
rs.Stop()
rs2 := NewRemoteSigner(
logger,
chainID,
types.NewMockPV(),
DialTCPFn(addr, testConnDeadline, ed25519.GenPrivKey()),
)
// let some pings pass
time.Sleep(testHeartbeatTimeout3o2)
require.NoError(t, rs2.Start())
assert.True(t, rs2.IsRunning())
defer rs2.Stop()
// give the client some time to re-establish the conn to the remote signer
// should see sth like this in the logs:
//
// E[10016-01-10|17:12:46.128] Ping err="remote signer timed out"
// I[10016-01-10|17:16:42.447] Re-created connection to remote signer impl=SocketVal
time.Sleep(testConnDeadline * 2)
}
func newSocketVal(logger log.Logger, addr string, connDeadline time.Duration) *SocketVal {
ln, err := net.Listen(cmn.ProtocolAndAddress(addr))
if err != nil {
panic(err)
}
tcpLn := NewTCPListener(ln, ed25519.GenPrivKey())
TCPListenerAcceptDeadline(testAcceptDeadline)(tcpLn)
TCPListenerConnDeadline(testConnDeadline)(tcpLn)
return NewSocketVal(logger, tcpLn)
}
func testSetupSocketPair( func testSetupSocketPair(
t *testing.T, t *testing.T,
chainID string, chainID string,
privValidator types.PrivValidator, privValidator types.PrivValidator,
) (*TCPVal, *RemoteSigner) { ) (*SocketVal, *RemoteSigner) {
var ( var (
addr = testFreeAddr(t) addr = testFreeAddr(t)
logger = log.TestingLogger() logger = log.TestingLogger()
@ -344,20 +390,16 @@ func testSetupSocketPair(
rs = NewRemoteSigner( rs = NewRemoteSigner(
logger, logger,
chainID, chainID,
addr,
privVal, privVal,
ed25519.GenPrivKey(), DialTCPFn(addr, testConnDeadline, ed25519.GenPrivKey()),
)
sc = NewTCPVal(
logger,
addr,
ed25519.GenPrivKey(),
)
) )
TCPValConnTimeout(5 * time.Millisecond)(sc) thisConnTimeout = testConnDeadline
TCPValHeartbeat(2 * time.Millisecond)(sc) sc = newSocketVal(logger, addr, thisConnTimeout)
RemoteSignerConnDeadline(5 * time.Millisecond)(rs) )
SocketValHeartbeat(testHeartbeatTimeout)(sc)
RemoteSignerConnDeadline(testConnDeadline)(rs)
RemoteSignerConnRetries(1e6)(rs) RemoteSignerConnRetries(1e6)(rs)
testStartSocketPV(t, readyc, sc) testStartSocketPV(t, readyc, sc)
@ -378,8 +420,8 @@ func testReadWriteResponse(t *testing.T, resp RemoteSignerMsg, rsConn net.Conn)
require.NoError(t, err) require.NoError(t, err)
} }
func testStartSocketPV(t *testing.T, readyc chan struct{}, sc *TCPVal) { func testStartSocketPV(t *testing.T, readyc chan struct{}, sc *SocketVal) {
go func(sc *TCPVal) { go func(sc *SocketVal) {
require.NoError(t, sc.Start()) require.NoError(t, sc.Start())
assert.True(t, sc.IsRunning()) assert.True(t, sc.IsRunning())

21
privval/doc.go Normal file
View File

@ -0,0 +1,21 @@
/*
Package privval provides different implementations of the types.PrivValidator.
FilePV
FilePV is the simplest implementation and developer default. It uses one file for the private key and another to store state.
SocketVal
SocketVal establishes a connection to an external process, like a Key Management Server (KMS), using a socket.
SocketVal listens for the external KMS process to dial in.
SocketVal takes a listener, which determines the type of connection
(ie. encrypted over tcp, or unencrypted over unix).
RemoteSigner
RemoteSigner is a simple wrapper around a net.Conn. It's used by both IPCVal and TCPVal.
*/
package privval

View File

@ -22,6 +22,7 @@ const (
stepPrecommit int8 = 3 stepPrecommit int8 = 3
) )
// A vote is either stepPrevote or stepPrecommit.
func voteToStep(vote *types.Vote) int8 { func voteToStep(vote *types.Vote) int8 {
switch vote.Type { switch vote.Type {
case types.PrevoteType: case types.PrevoteType:
@ -29,7 +30,7 @@ func voteToStep(vote *types.Vote) int8 {
case types.PrecommitType: case types.PrecommitType:
return stepPrecommit return stepPrecommit
default: default:
cmn.PanicSanity("Unknown vote type") panic("Unknown vote type")
return 0 return 0
} }
} }

View File

@ -1,123 +0,0 @@
package privval
import (
"net"
"time"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
// IPCValOption sets an optional parameter on the SocketPV.
type IPCValOption func(*IPCVal)
// IPCValConnTimeout sets the read and write timeout for connections
// from external signing processes.
func IPCValConnTimeout(timeout time.Duration) IPCValOption {
return func(sc *IPCVal) { sc.connTimeout = timeout }
}
// IPCValHeartbeat sets the period on which to check the liveness of the
// connected Signer connections.
func IPCValHeartbeat(period time.Duration) IPCValOption {
return func(sc *IPCVal) { sc.connHeartbeat = period }
}
// IPCVal implements PrivValidator, it uses a unix socket to request signatures
// from an external process.
type IPCVal struct {
cmn.BaseService
*RemoteSignerClient
addr string
connTimeout time.Duration
connHeartbeat time.Duration
conn net.Conn
cancelPing chan struct{}
pingTicker *time.Ticker
}
// Check that IPCVal implements PrivValidator.
var _ types.PrivValidator = (*IPCVal)(nil)
// NewIPCVal returns an instance of IPCVal.
func NewIPCVal(
logger log.Logger,
socketAddr string,
) *IPCVal {
sc := &IPCVal{
addr: socketAddr,
connTimeout: connTimeout,
connHeartbeat: connHeartbeat,
}
sc.BaseService = *cmn.NewBaseService(logger, "IPCVal", sc)
return sc
}
// OnStart implements cmn.Service.
func (sc *IPCVal) OnStart() error {
err := sc.connect()
if err != nil {
sc.Logger.Error("OnStart", "err", err)
return err
}
sc.RemoteSignerClient, err = NewRemoteSignerClient(sc.conn)
if err != nil {
return err
}
// Start a routine to keep the connection alive
sc.cancelPing = make(chan struct{}, 1)
sc.pingTicker = time.NewTicker(sc.connHeartbeat)
go func() {
for {
select {
case <-sc.pingTicker.C:
err := sc.Ping()
if err != nil {
sc.Logger.Error("Ping", "err", err)
}
case <-sc.cancelPing:
sc.pingTicker.Stop()
return
}
}
}()
return nil
}
// OnStop implements cmn.Service.
func (sc *IPCVal) OnStop() {
if sc.cancelPing != nil {
close(sc.cancelPing)
}
if sc.conn != nil {
if err := sc.conn.Close(); err != nil {
sc.Logger.Error("OnStop", "err", err)
}
}
}
func (sc *IPCVal) connect() error {
la, err := net.ResolveUnixAddr("unix", sc.addr)
if err != nil {
return err
}
conn, err := net.DialUnix("unix", nil, la)
if err != nil {
return err
}
sc.conn = newTimeoutConn(conn, sc.connTimeout)
return nil
}

View File

@ -1,132 +0,0 @@
package privval
import (
"io"
"net"
"time"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
// IPCRemoteSignerOption sets an optional parameter on the IPCRemoteSigner.
type IPCRemoteSignerOption func(*IPCRemoteSigner)
// IPCRemoteSignerConnDeadline sets the read and write deadline for connections
// from external signing processes.
func IPCRemoteSignerConnDeadline(deadline time.Duration) IPCRemoteSignerOption {
return func(ss *IPCRemoteSigner) { ss.connDeadline = deadline }
}
// IPCRemoteSignerConnRetries sets the amount of attempted retries to connect.
func IPCRemoteSignerConnRetries(retries int) IPCRemoteSignerOption {
return func(ss *IPCRemoteSigner) { ss.connRetries = retries }
}
// IPCRemoteSigner is a RPC implementation of PrivValidator that listens on a unix socket.
type IPCRemoteSigner struct {
cmn.BaseService
addr string
chainID string
connDeadline time.Duration
connRetries int
privVal types.PrivValidator
listener *net.UnixListener
}
// NewIPCRemoteSigner returns an instance of IPCRemoteSigner.
func NewIPCRemoteSigner(
logger log.Logger,
chainID, socketAddr string,
privVal types.PrivValidator,
) *IPCRemoteSigner {
rs := &IPCRemoteSigner{
addr: socketAddr,
chainID: chainID,
connDeadline: time.Second * defaultConnDeadlineSeconds,
connRetries: defaultDialRetries,
privVal: privVal,
}
rs.BaseService = *cmn.NewBaseService(logger, "IPCRemoteSigner", rs)
return rs
}
// OnStart implements cmn.Service.
func (rs *IPCRemoteSigner) OnStart() error {
err := rs.listen()
if err != nil {
err = cmn.ErrorWrap(err, "listen")
rs.Logger.Error("OnStart", "err", err)
return err
}
go func() {
for {
conn, err := rs.listener.AcceptUnix()
if err != nil {
rs.Logger.Error("AcceptUnix", "err", err)
return
}
go rs.handleConnection(conn)
}
}()
return nil
}
// OnStop implements cmn.Service.
func (rs *IPCRemoteSigner) OnStop() {
if rs.listener != nil {
if err := rs.listener.Close(); err != nil {
rs.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed"))
}
}
}
func (rs *IPCRemoteSigner) listen() error {
la, err := net.ResolveUnixAddr("unix", rs.addr)
if err != nil {
return err
}
rs.listener, err = net.ListenUnix("unix", la)
return err
}
func (rs *IPCRemoteSigner) handleConnection(conn net.Conn) {
for {
if !rs.IsRunning() {
return // Ignore error from listener closing.
}
// Reset the connection deadline
conn.SetDeadline(time.Now().Add(rs.connDeadline))
req, err := readMsg(conn)
if err != nil {
if err != io.EOF {
rs.Logger.Error("handleConnection", "err", err)
}
return
}
res, err := handleRequest(req, rs.chainID, rs.privVal)
if err != nil {
// only log the error; we'll reply with an error in res
rs.Logger.Error("handleConnection", "err", err)
}
err = writeMsg(conn, res)
if err != nil {
rs.Logger.Error("handleConnection", "err", err)
return
}
}
}

View File

@ -1,147 +0,0 @@
package privval
import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
func TestIPCPVVote(t *testing.T) {
var (
chainID = cmn.RandStr(12)
sc, rs = testSetupIPCSocketPair(t, chainID, types.NewMockPV())
ts = time.Now()
vType = types.PrecommitType
want = &types.Vote{Timestamp: ts, Type: vType}
have = &types.Vote{Timestamp: ts, Type: vType}
)
defer sc.Stop()
defer rs.Stop()
require.NoError(t, rs.privVal.SignVote(chainID, want))
require.NoError(t, sc.SignVote(chainID, have))
assert.Equal(t, want.Signature, have.Signature)
}
func TestIPCPVVoteResetDeadline(t *testing.T) {
var (
chainID = cmn.RandStr(12)
sc, rs = testSetupIPCSocketPair(t, chainID, types.NewMockPV())
ts = time.Now()
vType = types.PrecommitType
want = &types.Vote{Timestamp: ts, Type: vType}
have = &types.Vote{Timestamp: ts, Type: vType}
)
defer sc.Stop()
defer rs.Stop()
time.Sleep(3 * time.Millisecond)
require.NoError(t, rs.privVal.SignVote(chainID, want))
require.NoError(t, sc.SignVote(chainID, have))
assert.Equal(t, want.Signature, have.Signature)
// This would exceed the deadline if it was not extended by the previous message
time.Sleep(3 * time.Millisecond)
require.NoError(t, rs.privVal.SignVote(chainID, want))
require.NoError(t, sc.SignVote(chainID, have))
assert.Equal(t, want.Signature, have.Signature)
}
func TestIPCPVVoteKeepalive(t *testing.T) {
var (
chainID = cmn.RandStr(12)
sc, rs = testSetupIPCSocketPair(t, chainID, types.NewMockPV())
ts = time.Now()
vType = types.PrecommitType
want = &types.Vote{Timestamp: ts, Type: vType}
have = &types.Vote{Timestamp: ts, Type: vType}
)
defer sc.Stop()
defer rs.Stop()
time.Sleep(10 * time.Millisecond)
require.NoError(t, rs.privVal.SignVote(chainID, want))
require.NoError(t, sc.SignVote(chainID, have))
assert.Equal(t, want.Signature, have.Signature)
}
func testSetupIPCSocketPair(
t *testing.T,
chainID string,
privValidator types.PrivValidator,
) (*IPCVal, *IPCRemoteSigner) {
addr, err := testUnixAddr()
require.NoError(t, err)
var (
logger = log.TestingLogger()
privVal = privValidator
readyc = make(chan struct{})
rs = NewIPCRemoteSigner(
logger,
chainID,
addr,
privVal,
)
sc = NewIPCVal(
logger,
addr,
)
)
IPCValConnTimeout(5 * time.Millisecond)(sc)
IPCValHeartbeat(time.Millisecond)(sc)
IPCRemoteSignerConnDeadline(time.Millisecond * 5)(rs)
testStartIPCRemoteSigner(t, readyc, rs)
<-readyc
require.NoError(t, sc.Start())
assert.True(t, sc.IsRunning())
return sc, rs
}
func testStartIPCRemoteSigner(t *testing.T, readyc chan struct{}, rs *IPCRemoteSigner) {
go func(rs *IPCRemoteSigner) {
require.NoError(t, rs.Start())
assert.True(t, rs.IsRunning())
readyc <- struct{}{}
}(rs)
}
func testUnixAddr() (string, error) {
f, err := ioutil.TempFile("/tmp", "nettest")
if err != nil {
return "", err
}
addr := f.Name()
err = f.Close()
if err != nil {
return "", err
}
err = os.Remove(addr)
if err != nil {
return "", err
}
return addr, nil
}

View File

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"sync"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -14,31 +13,41 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
// RemoteSignerClient implements PrivValidator, it uses a socket to request signatures // Socket errors.
var (
ErrConnTimeout = errors.New("remote signer timed out")
)
// RemoteSignerClient implements PrivValidator.
// It uses a net.Conn to request signatures
// from an external process. // from an external process.
type RemoteSignerClient struct { type RemoteSignerClient struct {
conn net.Conn conn net.Conn
// memoized
consensusPubKey crypto.PubKey consensusPubKey crypto.PubKey
mtx sync.Mutex
} }
// Check that RemoteSignerClient implements PrivValidator. // Check that RemoteSignerClient implements PrivValidator.
var _ types.PrivValidator = (*RemoteSignerClient)(nil) var _ types.PrivValidator = (*RemoteSignerClient)(nil)
// NewRemoteSignerClient returns an instance of RemoteSignerClient. // NewRemoteSignerClient returns an instance of RemoteSignerClient.
func NewRemoteSignerClient( func NewRemoteSignerClient(conn net.Conn) (*RemoteSignerClient, error) {
conn net.Conn,
) (*RemoteSignerClient, error) { // retrieve and memoize the consensus public key once.
sc := &RemoteSignerClient{ pubKey, err := getPubKey(conn)
conn: conn,
}
pubKey, err := sc.getPubKey()
if err != nil { if err != nil {
return nil, cmn.ErrorWrap(err, "error while retrieving public key for remote signer") return nil, cmn.ErrorWrap(err, "error while retrieving public key for remote signer")
} }
// retrieve and memoize the consensus public key once: return &RemoteSignerClient{
sc.consensusPubKey = pubKey conn: conn,
return sc, nil consensusPubKey: pubKey,
}, nil
}
// Close calls Close on the underlying net.Conn.
func (sc *RemoteSignerClient) Close() error {
return sc.conn.Close()
} }
// GetPubKey implements PrivValidator. // GetPubKey implements PrivValidator.
@ -46,16 +55,14 @@ func (sc *RemoteSignerClient) GetPubKey() crypto.PubKey {
return sc.consensusPubKey return sc.consensusPubKey
} }
func (sc *RemoteSignerClient) getPubKey() (crypto.PubKey, error) { // not thread-safe (only called on startup).
sc.mtx.Lock() func getPubKey(conn net.Conn) (crypto.PubKey, error) {
defer sc.mtx.Unlock() err := writeMsg(conn, &PubKeyRequest{})
err := writeMsg(sc.conn, &PubKeyRequest{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
res, err := readMsg(sc.conn) res, err := readMsg(conn)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -73,9 +80,6 @@ func (sc *RemoteSignerClient) getPubKey() (crypto.PubKey, error) {
// SignVote implements PrivValidator. // SignVote implements PrivValidator.
func (sc *RemoteSignerClient) SignVote(chainID string, vote *types.Vote) error { func (sc *RemoteSignerClient) SignVote(chainID string, vote *types.Vote) error {
sc.mtx.Lock()
defer sc.mtx.Unlock()
err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote}) err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote})
if err != nil { if err != nil {
return err return err
@ -103,9 +107,6 @@ func (sc *RemoteSignerClient) SignProposal(
chainID string, chainID string,
proposal *types.Proposal, proposal *types.Proposal,
) error { ) error {
sc.mtx.Lock()
defer sc.mtx.Unlock()
err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal}) err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal})
if err != nil { if err != nil {
return err return err
@ -129,9 +130,6 @@ func (sc *RemoteSignerClient) SignProposal(
// Ping is used to check connection health. // Ping is used to check connection health.
func (sc *RemoteSignerClient) Ping() error { func (sc *RemoteSignerClient) Ping() error {
sc.mtx.Lock()
defer sc.mtx.Unlock()
err := writeMsg(sc.conn, &PingRequest{}) err := writeMsg(sc.conn, &PingRequest{})
if err != nil { if err != nil {
return err return err

View File

@ -5,6 +5,7 @@ import (
"net" "net"
"time" "time"
"github.com/pkg/errors"
"github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log" "github.com/tendermint/tendermint/libs/log"
@ -12,6 +13,11 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
// Socket errors.
var (
ErrDialRetryMax = errors.New("dialed maximum retries")
)
// RemoteSignerOption sets an optional parameter on the RemoteSigner. // RemoteSignerOption sets an optional parameter on the RemoteSigner.
type RemoteSignerOption func(*RemoteSigner) type RemoteSignerOption func(*RemoteSigner)
@ -26,38 +32,64 @@ func RemoteSignerConnRetries(retries int) RemoteSignerOption {
return func(ss *RemoteSigner) { ss.connRetries = retries } return func(ss *RemoteSigner) { ss.connRetries = retries }
} }
// RemoteSigner implements PrivValidator by dialing to a socket. // RemoteSigner dials using its dialer and responds to any
// signature requests using its privVal.
type RemoteSigner struct { type RemoteSigner struct {
cmn.BaseService cmn.BaseService
addr string
chainID string chainID string
connDeadline time.Duration connDeadline time.Duration
connRetries int connRetries int
privKey ed25519.PrivKeyEd25519
privVal types.PrivValidator privVal types.PrivValidator
dialer Dialer
conn net.Conn conn net.Conn
} }
// NewRemoteSigner returns an instance of RemoteSigner. // Dialer dials a remote address and returns a net.Conn or an error.
type Dialer func() (net.Conn, error)
// DialTCPFn dials the given tcp addr, using the given connTimeout and privKey for the
// authenticated encryption handshake.
func DialTCPFn(addr string, connTimeout time.Duration, privKey ed25519.PrivKeyEd25519) Dialer {
return func() (net.Conn, error) {
conn, err := cmn.Connect(addr)
if err == nil {
err = conn.SetDeadline(time.Now().Add(connTimeout))
}
if err == nil {
conn, err = p2pconn.MakeSecretConnection(conn, privKey)
}
return conn, err
}
}
// DialUnixFn dials the given unix socket.
func DialUnixFn(addr string) Dialer {
return func() (net.Conn, error) {
unixAddr := &net.UnixAddr{addr, "unix"}
return net.DialUnix("unix", nil, unixAddr)
}
}
// NewRemoteSigner return a RemoteSigner that will dial using the given
// dialer and respond to any signature requests over the connection
// using the given privVal.
func NewRemoteSigner( func NewRemoteSigner(
logger log.Logger, logger log.Logger,
chainID, socketAddr string, chainID string,
privVal types.PrivValidator, privVal types.PrivValidator,
privKey ed25519.PrivKeyEd25519, dialer Dialer,
) *RemoteSigner { ) *RemoteSigner {
rs := &RemoteSigner{ rs := &RemoteSigner{
addr: socketAddr,
chainID: chainID, chainID: chainID,
connDeadline: time.Second * defaultConnDeadlineSeconds, connDeadline: time.Second * defaultConnDeadlineSeconds,
connRetries: defaultDialRetries, connRetries: defaultDialRetries,
privKey: privKey,
privVal: privVal, privVal: privVal,
dialer: dialer,
} }
rs.BaseService = *cmn.NewBaseService(logger, "RemoteSigner", rs) rs.BaseService = *cmn.NewBaseService(logger, "RemoteSigner", rs)
return rs return rs
} }
@ -68,6 +100,7 @@ func (rs *RemoteSigner) OnStart() error {
rs.Logger.Error("OnStart", "err", err) rs.Logger.Error("OnStart", "err", err)
return err return err
} }
rs.conn = conn
go rs.handleConnection(conn) go rs.handleConnection(conn)
@ -91,36 +124,11 @@ func (rs *RemoteSigner) connect() (net.Conn, error) {
if retries != rs.connRetries { if retries != rs.connRetries {
time.Sleep(rs.connDeadline) time.Sleep(rs.connDeadline)
} }
conn, err := rs.dialer()
conn, err := cmn.Connect(rs.addr)
if err != nil { if err != nil {
rs.Logger.Error( rs.Logger.Error("dialing", "err", err)
"connect",
"addr", rs.addr,
"err", err,
)
continue continue
} }
if err := conn.SetDeadline(time.Now().Add(connTimeout)); err != nil {
rs.Logger.Error(
"connect",
"err", err,
)
continue
}
conn, err = p2pconn.MakeSecretConnection(conn, rs.privKey)
if err != nil {
rs.Logger.Error(
"connect",
"err", err,
)
continue
}
return conn, nil return conn, nil
} }
@ -139,7 +147,7 @@ func (rs *RemoteSigner) handleConnection(conn net.Conn) {
req, err := readMsg(conn) req, err := readMsg(conn)
if err != nil { if err != nil {
if err != io.EOF { if err != io.EOF {
rs.Logger.Error("handleConnection", "err", err) rs.Logger.Error("handleConnection readMsg", "err", err)
} }
return return
} }
@ -148,12 +156,12 @@ func (rs *RemoteSigner) handleConnection(conn net.Conn) {
if err != nil { if err != nil {
// only log the error; we'll reply with an error in res // only log the error; we'll reply with an error in res
rs.Logger.Error("handleConnection", "err", err) rs.Logger.Error("handleConnection handleRequest", "err", err)
} }
err = writeMsg(conn, res) err = writeMsg(conn, res)
if err != nil { if err != nil {
rs.Logger.Error("handleConnection", "err", err) rs.Logger.Error("handleConnection writeMsg", "err", err)
return return
} }
} }

184
privval/socket.go Normal file
View File

@ -0,0 +1,184 @@
package privval
import (
"net"
"time"
"github.com/tendermint/tendermint/crypto/ed25519"
p2pconn "github.com/tendermint/tendermint/p2p/conn"
)
const (
defaultAcceptDeadlineSeconds = 3
defaultConnDeadlineSeconds = 3
)
// timeoutError can be used to check if an error returned from the netp package
// was due to a timeout.
type timeoutError interface {
Timeout() bool
}
//------------------------------------------------------------------
// TCP Listener
// TCPListenerOption sets an optional parameter on the tcpListener.
type TCPListenerOption func(*tcpListener)
// TCPListenerAcceptDeadline sets the deadline for the listener.
// A zero time value disables the deadline.
func TCPListenerAcceptDeadline(deadline time.Duration) TCPListenerOption {
return func(tl *tcpListener) { tl.acceptDeadline = deadline }
}
// TCPListenerConnDeadline sets the read and write deadline for connections
// from external signing processes.
func TCPListenerConnDeadline(deadline time.Duration) TCPListenerOption {
return func(tl *tcpListener) { tl.connDeadline = deadline }
}
// tcpListener implements net.Listener.
var _ net.Listener = (*tcpListener)(nil)
// tcpListener wraps a *net.TCPListener to standardise protocol timeouts
// and potentially other tuning parameters. It also returns encrypted connections.
type tcpListener struct {
*net.TCPListener
secretConnKey ed25519.PrivKeyEd25519
acceptDeadline time.Duration
connDeadline time.Duration
}
// NewTCPListener returns a listener that accepts authenticated encrypted connections
// using the given secretConnKey and the default timeout values.
func NewTCPListener(ln net.Listener, secretConnKey ed25519.PrivKeyEd25519) *tcpListener {
return &tcpListener{
TCPListener: ln.(*net.TCPListener),
secretConnKey: secretConnKey,
acceptDeadline: time.Second * defaultAcceptDeadlineSeconds,
connDeadline: time.Second * defaultConnDeadlineSeconds,
}
}
// Accept implements net.Listener.
func (ln *tcpListener) Accept() (net.Conn, error) {
err := ln.SetDeadline(time.Now().Add(ln.acceptDeadline))
if err != nil {
return nil, err
}
tc, err := ln.AcceptTCP()
if err != nil {
return nil, err
}
// Wrap the conn in our timeout and encryption wrappers
timeoutConn := newTimeoutConn(tc, ln.connDeadline)
secretConn, err := p2pconn.MakeSecretConnection(timeoutConn, ln.secretConnKey)
if err != nil {
return nil, err
}
return secretConn, nil
}
//------------------------------------------------------------------
// Unix Listener
// unixListener implements net.Listener.
var _ net.Listener = (*unixListener)(nil)
type UnixListenerOption func(*unixListener)
// UnixListenerAcceptDeadline sets the deadline for the listener.
// A zero time value disables the deadline.
func UnixListenerAcceptDeadline(deadline time.Duration) UnixListenerOption {
return func(ul *unixListener) { ul.acceptDeadline = deadline }
}
// UnixListenerConnDeadline sets the read and write deadline for connections
// from external signing processes.
func UnixListenerConnDeadline(deadline time.Duration) UnixListenerOption {
return func(ul *unixListener) { ul.connDeadline = deadline }
}
// unixListener wraps a *net.UnixListener to standardise protocol timeouts
// and potentially other tuning parameters. It returns unencrypted connections.
type unixListener struct {
*net.UnixListener
acceptDeadline time.Duration
connDeadline time.Duration
}
// NewUnixListener returns a listener that accepts unencrypted connections
// using the default timeout values.
func NewUnixListener(ln net.Listener) *unixListener {
return &unixListener{
UnixListener: ln.(*net.UnixListener),
acceptDeadline: time.Second * defaultAcceptDeadlineSeconds,
connDeadline: time.Second * defaultConnDeadlineSeconds,
}
}
// Accept implements net.Listener.
func (ln *unixListener) Accept() (net.Conn, error) {
err := ln.SetDeadline(time.Now().Add(ln.acceptDeadline))
if err != nil {
return nil, err
}
tc, err := ln.AcceptUnix()
if err != nil {
return nil, err
}
// Wrap the conn in our timeout wrapper
conn := newTimeoutConn(tc, ln.connDeadline)
// TODO: wrap in something that authenticates
// with a MAC - https://github.com/tendermint/tendermint/issues/3099
return conn, nil
}
//------------------------------------------------------------------
// Connection
// timeoutConn implements net.Conn.
var _ net.Conn = (*timeoutConn)(nil)
// timeoutConn wraps a net.Conn to standardise protocol timeouts / deadline resets.
type timeoutConn struct {
net.Conn
connDeadline time.Duration
}
// newTimeoutConn returns an instance of newTCPTimeoutConn.
func newTimeoutConn(
conn net.Conn,
connDeadline time.Duration) *timeoutConn {
return &timeoutConn{
conn,
connDeadline,
}
}
// Read implements net.Conn.
func (c timeoutConn) Read(b []byte) (n int, err error) {
// Reset deadline
c.Conn.SetReadDeadline(time.Now().Add(c.connDeadline))
return c.Conn.Read(b)
}
// Write implements net.Conn.
func (c timeoutConn) Write(b []byte) (n int, err error) {
// Reset deadline
c.Conn.SetWriteDeadline(time.Now().Add(c.connDeadline))
return c.Conn.Write(b)
}

View File

@ -4,17 +4,31 @@ import (
"net" "net"
"testing" "testing"
"time" "time"
"github.com/tendermint/tendermint/crypto/ed25519"
) )
func TestTCPTimeoutListenerAcceptDeadline(t *testing.T) { //-------------------------------------------
// helper funcs
func newPrivKey() ed25519.PrivKeyEd25519 {
return ed25519.GenPrivKey()
}
//-------------------------------------------
// tests
func TestTCPListenerAcceptDeadline(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0") ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ln = newTCPTimeoutListener(ln, time.Millisecond, time.Second, time.Second) tcpLn := NewTCPListener(ln, newPrivKey())
TCPListenerAcceptDeadline(time.Millisecond)(tcpLn)
TCPListenerConnDeadline(time.Second)(tcpLn)
_, err = ln.Accept() _, err = tcpLn.Accept()
opErr, ok := err.(*net.OpError) opErr, ok := err.(*net.OpError)
if !ok { if !ok {
t.Fatalf("have %v, want *net.OpError", err) t.Fatalf("have %v, want *net.OpError", err)
@ -25,14 +39,17 @@ func TestTCPTimeoutListenerAcceptDeadline(t *testing.T) {
} }
} }
func TestTCPTimeoutListenerConnDeadline(t *testing.T) { func TestTCPListenerConnDeadline(t *testing.T) {
ln, err := net.Listen("tcp", "127.0.0.1:0") ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
ln = newTCPTimeoutListener(ln, time.Second, time.Millisecond, time.Second) tcpLn := NewTCPListener(ln, newPrivKey())
TCPListenerAcceptDeadline(time.Second)(tcpLn)
TCPListenerConnDeadline(time.Millisecond)(tcpLn)
readyc := make(chan struct{})
donec := make(chan struct{}) donec := make(chan struct{})
go func(ln net.Listener) { go func(ln net.Listener) {
defer close(donec) defer close(donec)
@ -41,6 +58,7 @@ func TestTCPTimeoutListenerConnDeadline(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
<-readyc
time.Sleep(2 * time.Millisecond) time.Sleep(2 * time.Millisecond)
@ -54,12 +72,13 @@ func TestTCPTimeoutListenerConnDeadline(t *testing.T) {
if have, want := opErr.Op, "read"; have != want { if have, want := opErr.Op, "read"; have != want {
t.Errorf("have %v, want %v", have, want) t.Errorf("have %v, want %v", have, want)
} }
}(ln) }(tcpLn)
_, err = net.Dial("tcp", ln.Addr().String()) dialer := DialTCPFn(ln.Addr().String(), testConnDeadline, newPrivKey())
_, err = dialer()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
close(readyc)
<-donec <-donec
} }

View File

@ -1,216 +0,0 @@
package privval
import (
"errors"
"net"
"time"
"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
p2pconn "github.com/tendermint/tendermint/p2p/conn"
"github.com/tendermint/tendermint/types"
)
const (
defaultAcceptDeadlineSeconds = 3
defaultConnDeadlineSeconds = 3
defaultConnHeartBeatSeconds = 2
defaultDialRetries = 10
)
// Socket errors.
var (
ErrDialRetryMax = errors.New("dialed maximum retries")
ErrConnTimeout = errors.New("remote signer timed out")
ErrUnexpectedResponse = errors.New("received unexpected response")
)
var (
acceptDeadline = time.Second * defaultAcceptDeadlineSeconds
connTimeout = time.Second * defaultConnDeadlineSeconds
connHeartbeat = time.Second * defaultConnHeartBeatSeconds
)
// TCPValOption sets an optional parameter on the SocketPV.
type TCPValOption func(*TCPVal)
// TCPValAcceptDeadline sets the deadline for the TCPVal listener.
// A zero time value disables the deadline.
func TCPValAcceptDeadline(deadline time.Duration) TCPValOption {
return func(sc *TCPVal) { sc.acceptDeadline = deadline }
}
// TCPValConnTimeout sets the read and write timeout for connections
// from external signing processes.
func TCPValConnTimeout(timeout time.Duration) TCPValOption {
return func(sc *TCPVal) { sc.connTimeout = timeout }
}
// TCPValHeartbeat sets the period on which to check the liveness of the
// connected Signer connections.
func TCPValHeartbeat(period time.Duration) TCPValOption {
return func(sc *TCPVal) { sc.connHeartbeat = period }
}
// TCPVal implements PrivValidator, it uses a socket to request signatures
// from an external process.
type TCPVal struct {
cmn.BaseService
*RemoteSignerClient
addr string
acceptDeadline time.Duration
connTimeout time.Duration
connHeartbeat time.Duration
privKey ed25519.PrivKeyEd25519
conn net.Conn
listener net.Listener
cancelPing chan struct{}
pingTicker *time.Ticker
}
// Check that TCPVal implements PrivValidator.
var _ types.PrivValidator = (*TCPVal)(nil)
// NewTCPVal returns an instance of TCPVal.
func NewTCPVal(
logger log.Logger,
socketAddr string,
privKey ed25519.PrivKeyEd25519,
) *TCPVal {
sc := &TCPVal{
addr: socketAddr,
acceptDeadline: acceptDeadline,
connTimeout: connTimeout,
connHeartbeat: connHeartbeat,
privKey: privKey,
}
sc.BaseService = *cmn.NewBaseService(logger, "TCPVal", sc)
return sc
}
// OnStart implements cmn.Service.
func (sc *TCPVal) OnStart() error {
if err := sc.listen(); err != nil {
sc.Logger.Error("OnStart", "err", err)
return err
}
conn, err := sc.waitConnection()
if err != nil {
sc.Logger.Error("OnStart", "err", err)
return err
}
sc.conn = conn
sc.RemoteSignerClient, err = NewRemoteSignerClient(sc.conn)
if err != nil {
return err
}
// Start a routine to keep the connection alive
sc.cancelPing = make(chan struct{}, 1)
sc.pingTicker = time.NewTicker(sc.connHeartbeat)
go func() {
for {
select {
case <-sc.pingTicker.C:
err := sc.Ping()
if err != nil {
sc.Logger.Error(
"Ping",
"err", err,
)
}
case <-sc.cancelPing:
sc.pingTicker.Stop()
return
}
}
}()
return nil
}
// OnStop implements cmn.Service.
func (sc *TCPVal) OnStop() {
if sc.cancelPing != nil {
close(sc.cancelPing)
}
if sc.conn != nil {
if err := sc.conn.Close(); err != nil {
sc.Logger.Error("OnStop", "err", err)
}
}
if sc.listener != nil {
if err := sc.listener.Close(); err != nil {
sc.Logger.Error("OnStop", "err", err)
}
}
}
func (sc *TCPVal) acceptConnection() (net.Conn, error) {
conn, err := sc.listener.Accept()
if err != nil {
if !sc.IsRunning() {
return nil, nil // Ignore error from listener closing.
}
return nil, err
}
conn, err = p2pconn.MakeSecretConnection(conn, sc.privKey)
if err != nil {
return nil, err
}
return conn, nil
}
func (sc *TCPVal) listen() error {
ln, err := net.Listen(cmn.ProtocolAndAddress(sc.addr))
if err != nil {
return err
}
sc.listener = newTCPTimeoutListener(
ln,
sc.acceptDeadline,
sc.connTimeout,
sc.connHeartbeat,
)
return nil
}
// waitConnection uses the configured wait timeout to error if no external
// process connects in the time period.
func (sc *TCPVal) waitConnection() (net.Conn, error) {
var (
connc = make(chan net.Conn, 1)
errc = make(chan error, 1)
)
go func(connc chan<- net.Conn, errc chan<- error) {
conn, err := sc.acceptConnection()
if err != nil {
errc <- err
return
}
connc <- conn
}(connc, errc)
select {
case conn := <-connc:
return conn, nil
case err := <-errc:
return nil, err
}
}

View File

@ -1,90 +0,0 @@
package privval
import (
"net"
"time"
)
// timeoutError can be used to check if an error returned from the netp package
// was due to a timeout.
type timeoutError interface {
Timeout() bool
}
// tcpTimeoutListener implements net.Listener.
var _ net.Listener = (*tcpTimeoutListener)(nil)
// tcpTimeoutListener wraps a *net.TCPListener to standardise protocol timeouts
// and potentially other tuning parameters.
type tcpTimeoutListener struct {
*net.TCPListener
acceptDeadline time.Duration
connDeadline time.Duration
period time.Duration
}
// timeoutConn wraps a net.Conn to standardise protocol timeouts / deadline resets.
type timeoutConn struct {
net.Conn
connDeadline time.Duration
}
// newTCPTimeoutListener returns an instance of tcpTimeoutListener.
func newTCPTimeoutListener(
ln net.Listener,
acceptDeadline, connDeadline time.Duration,
period time.Duration,
) tcpTimeoutListener {
return tcpTimeoutListener{
TCPListener: ln.(*net.TCPListener),
acceptDeadline: acceptDeadline,
connDeadline: connDeadline,
period: period,
}
}
// newTimeoutConn returns an instance of newTCPTimeoutConn.
func newTimeoutConn(
conn net.Conn,
connDeadline time.Duration) *timeoutConn {
return &timeoutConn{
conn,
connDeadline,
}
}
// Accept implements net.Listener.
func (ln tcpTimeoutListener) Accept() (net.Conn, error) {
err := ln.SetDeadline(time.Now().Add(ln.acceptDeadline))
if err != nil {
return nil, err
}
tc, err := ln.AcceptTCP()
if err != nil {
return nil, err
}
// Wrap the conn in our timeout wrapper
conn := newTimeoutConn(tc, ln.connDeadline)
return conn, nil
}
// Read implements net.Listener.
func (c timeoutConn) Read(b []byte) (n int, err error) {
// Reset deadline
c.Conn.SetReadDeadline(time.Now().Add(c.connDeadline))
return c.Conn.Read(b)
}
// Write implements net.Listener.
func (c timeoutConn) Write(b []byte) (n int, err error) {
// Reset deadline
c.Conn.SetWriteDeadline(time.Now().Add(c.connDeadline))
return c.Conn.Write(b)
}