diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 7fc9bead..fd95a944 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -10,8 +10,8 @@ Special thanks to external contributors on this release: - [cli] Removed `node` `--proxy_app=dummy` option. Use `kvstore` (`persistent_kvstore`) instead. - [cli] Renamed `node` `--proxy_app=nilapp` to `--proxy_app=noop`. - [config] \#2992 `allow_duplicate_ip` is now set to false - - [privval] \#2926 split up `PubKeyMsg` into `PubKeyRequest` and `PubKeyResponse` to be consistent with other message types +- [privval] \#2923 listen for unix socket connections instead of dialing them * Apps @@ -23,13 +23,13 @@ Special thanks to external contributors on this release: * Blockchain Protocol * P2P Protocol -- multiple connections from the same IP are now disabled by default (see `allow_duplicate_ip` config option) ### FEATURES: -- [privval] \#1181 Split immutable and mutable parts of priv_validator.json +- [privval] \#1181 Split immutable and mutable parts of `priv_validator.json` ### IMPROVEMENTS: - [p2p/conn] \#3111 make SecretConnection thread safe +- [privval] \#2923 retry RemoteSigner connections on error - [rpc] \#3047 Include peer's remote IP in `/net_info` ### BUG FIXES: diff --git a/cmd/priv_val_server/main.go b/cmd/priv_val_server/main.go index 54602558..6949e878 100644 --- a/cmd/priv_val_server/main.go +++ b/cmd/priv_val_server/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "os" + "time" "github.com/tendermint/tendermint/crypto/ed25519" cmn "github.com/tendermint/tendermint/libs/common" @@ -34,13 +35,20 @@ func main() { pv := privval.LoadFilePV(*privValKeyPath, *privValStatePath) - rs := privval.NewRemoteSigner( - logger, - *chainID, - *addr, - pv, - ed25519.GenPrivKey(), - ) + var dialer privval.Dialer + protocol, address := cmn.ProtocolAndAddress(*addr) + switch protocol { + case "unix": + dialer = privval.DialUnixFn(address) + case "tcp": + connTimeout := 3 * time.Second // TODO + dialer = privval.DialTCPFn(address, connTimeout, ed25519.GenPrivKey()) + default: + logger.Error("Unknown protocol", "protocol", protocol) + return + } + + rs := privval.NewRemoteSigner(logger, *chainID, pv, dialer) err := rs.Start() if err != nil { panic(err) diff --git a/node/node.go b/node/node.go index be4c7cc7..b7998dac 100644 --- a/node/node.go +++ b/node/node.go @@ -878,16 +878,20 @@ func createAndStartPrivValidatorSocketClient( listenAddr string, logger log.Logger, ) (types.PrivValidator, error) { - var pvsc types.PrivValidator + var listener net.Listener protocol, address := cmn.ProtocolAndAddress(listenAddr) + ln, err := net.Listen(protocol, address) + if err != nil { + return nil, err + } switch protocol { case "unix": - pvsc = privval.NewIPCVal(logger.With("module", "privval"), address) + listener = privval.NewUnixListener(ln) case "tcp": // TODO: persist this key so external signer // can actually authenticate us - pvsc = privval.NewTCPVal(logger.With("module", "privval"), listenAddr, ed25519.GenPrivKey()) + listener = privval.NewTCPListener(ln, ed25519.GenPrivKey()) default: return nil, fmt.Errorf( "Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s", @@ -895,10 +899,9 @@ func createAndStartPrivValidatorSocketClient( ) } - if pvsc, ok := pvsc.(cmn.Service); ok { - if err := pvsc.Start(); err != nil { - return nil, errors.Wrap(err, "failed to start") - } + pvsc := privval.NewSocketVal(logger.With("module", "privval"), listener) + if err := pvsc.Start(); err != nil { + return nil, errors.Wrap(err, "failed to start") } return pvsc, nil diff --git a/node/node_test.go b/node/node_test.go index e675eb9a..96d779d4 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -122,25 +122,25 @@ func TestNodeSetPrivValTCP(t *testing.T) { config := cfg.ResetTestRoot("node_priv_val_tcp_test") config.BaseConfig.PrivValidatorListenAddr = addr - rs := privval.NewRemoteSigner( + dialer := privval.DialTCPFn(addr, 100*time.Millisecond, ed25519.GenPrivKey()) + pvsc := privval.NewRemoteSigner( log.TestingLogger(), config.ChainID(), - addr, types.NewMockPV(), - ed25519.GenPrivKey(), + dialer, ) - privval.RemoteSignerConnDeadline(5 * time.Millisecond)(rs) + go func() { - err := rs.Start() + err := pvsc.Start() if err != nil { panic(err) } }() - defer rs.Stop() + defer pvsc.Stop() n, err := DefaultNewNode(config, log.TestingLogger()) require.NoError(t, err) - assert.IsType(t, &privval.TCPVal{}, n.PrivValidator()) + assert.IsType(t, &privval.SocketVal{}, n.PrivValidator()) } // address without a protocol must result in error @@ -161,25 +161,25 @@ func TestNodeSetPrivValIPC(t *testing.T) { config := cfg.ResetTestRoot("node_priv_val_tcp_test") config.BaseConfig.PrivValidatorListenAddr = "unix://" + tmpfile - rs := privval.NewIPCRemoteSigner( + dialer := privval.DialUnixFn(tmpfile) + pvsc := privval.NewRemoteSigner( log.TestingLogger(), config.ChainID(), - tmpfile, types.NewMockPV(), + dialer, ) - privval.IPCRemoteSignerConnDeadline(3 * time.Second)(rs) done := make(chan struct{}) go func() { defer close(done) n, err := DefaultNewNode(config, log.TestingLogger()) require.NoError(t, err) - assert.IsType(t, &privval.IPCVal{}, n.PrivValidator()) + assert.IsType(t, &privval.SocketVal{}, n.PrivValidator()) }() - err := rs.Start() + err := pvsc.Start() require.NoError(t, err) - defer rs.Stop() + defer pvsc.Stop() <-done } diff --git a/privval/client.go b/privval/client.go new file mode 100644 index 00000000..4d4395fd --- /dev/null +++ b/privval/client.go @@ -0,0 +1,263 @@ +package privval + +import ( + "errors" + "fmt" + "net" + "sync" + "time" + + "github.com/tendermint/tendermint/crypto" + cmn "github.com/tendermint/tendermint/libs/common" + "github.com/tendermint/tendermint/libs/log" + "github.com/tendermint/tendermint/types" +) + +const ( + defaultConnHeartBeatSeconds = 2 + defaultDialRetries = 10 +) + +// Socket errors. +var ( + ErrUnexpectedResponse = errors.New("received unexpected response") +) + +var ( + connHeartbeat = time.Second * defaultConnHeartBeatSeconds +) + +// SocketValOption sets an optional parameter on the SocketVal. +type SocketValOption func(*SocketVal) + +// SocketValHeartbeat sets the period on which to check the liveness of the +// connected Signer connections. +func SocketValHeartbeat(period time.Duration) SocketValOption { + return func(sc *SocketVal) { sc.connHeartbeat = period } +} + +// SocketVal implements PrivValidator. +// It listens for an external process to dial in and uses +// the socket to request signatures. +type SocketVal struct { + cmn.BaseService + + listener net.Listener + + // ping + cancelPing chan struct{} + pingTicker *time.Ticker + connHeartbeat time.Duration + + // signer is mutable since it can be + // reset if the connection fails. + // failures are detected by a background + // ping routine. + // Methods on the underlying net.Conn itself + // are already gorountine safe. + mtx sync.RWMutex + signer *RemoteSignerClient +} + +// Check that SocketVal implements PrivValidator. +var _ types.PrivValidator = (*SocketVal)(nil) + +// NewSocketVal returns an instance of SocketVal. +func NewSocketVal( + logger log.Logger, + listener net.Listener, +) *SocketVal { + sc := &SocketVal{ + listener: listener, + connHeartbeat: connHeartbeat, + } + + sc.BaseService = *cmn.NewBaseService(logger, "SocketVal", sc) + + return sc +} + +//-------------------------------------------------------- +// Implement PrivValidator + +// GetPubKey implements PrivValidator. +func (sc *SocketVal) GetPubKey() crypto.PubKey { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + return sc.signer.GetPubKey() +} + +// SignVote implements PrivValidator. +func (sc *SocketVal) SignVote(chainID string, vote *types.Vote) error { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + return sc.signer.SignVote(chainID, vote) +} + +// SignProposal implements PrivValidator. +func (sc *SocketVal) SignProposal(chainID string, proposal *types.Proposal) error { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + return sc.signer.SignProposal(chainID, proposal) +} + +//-------------------------------------------------------- +// More thread safe methods proxied to the signer + +// Ping is used to check connection health. +func (sc *SocketVal) Ping() error { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + return sc.signer.Ping() +} + +// Close closes the underlying net.Conn. +func (sc *SocketVal) Close() { + sc.mtx.RLock() + defer sc.mtx.RUnlock() + if sc.signer != nil { + if err := sc.signer.Close(); err != nil { + sc.Logger.Error("OnStop", "err", err) + } + } + + if sc.listener != nil { + if err := sc.listener.Close(); err != nil { + sc.Logger.Error("OnStop", "err", err) + } + } +} + +//-------------------------------------------------------- +// Service start and stop + +// OnStart implements cmn.Service. +func (sc *SocketVal) OnStart() error { + if closed, err := sc.reset(); err != nil { + sc.Logger.Error("OnStart", "err", err) + return err + } else if closed { + return fmt.Errorf("listener is closed") + } + + // Start a routine to keep the connection alive + sc.cancelPing = make(chan struct{}, 1) + sc.pingTicker = time.NewTicker(sc.connHeartbeat) + go func() { + for { + select { + case <-sc.pingTicker.C: + err := sc.Ping() + if err != nil { + sc.Logger.Error("Ping", "err", err) + if err == ErrUnexpectedResponse { + return + } + + closed, err := sc.reset() + if err != nil { + sc.Logger.Error("Reconnecting to remote signer failed", "err", err) + continue + } + if closed { + sc.Logger.Info("listener is closing") + return + } + + sc.Logger.Info("Re-created connection to remote signer", "impl", sc) + } + case <-sc.cancelPing: + sc.pingTicker.Stop() + return + } + } + }() + + return nil +} + +// OnStop implements cmn.Service. +func (sc *SocketVal) OnStop() { + if sc.cancelPing != nil { + close(sc.cancelPing) + } + sc.Close() +} + +//-------------------------------------------------------- +// Connection and signer management + +// waits to accept and sets a new connection. +// connection is closed in OnStop. +// returns true if the listener is closed +// (ie. it returns a nil conn). +func (sc *SocketVal) reset() (bool, error) { + sc.mtx.Lock() + defer sc.mtx.Unlock() + + // first check if the conn already exists and close it. + if sc.signer != nil { + if err := sc.signer.Close(); err != nil { + sc.Logger.Error("error closing connection", "err", err) + } + } + + // wait for a new conn + conn, err := sc.waitConnection() + if err != nil { + return false, err + } + + // listener is closed + if conn == nil { + return true, nil + } + + sc.signer, err = NewRemoteSignerClient(conn) + if err != nil { + // failed to fetch the pubkey. close out the connection. + if err := conn.Close(); err != nil { + sc.Logger.Error("error closing connection", "err", err) + } + return false, err + } + return false, nil +} + +func (sc *SocketVal) acceptConnection() (net.Conn, error) { + conn, err := sc.listener.Accept() + if err != nil { + if !sc.IsRunning() { + return nil, nil // Ignore error from listener closing. + } + return nil, err + + } + return conn, nil +} + +// waitConnection uses the configured wait timeout to error if no external +// process connects in the time period. +func (sc *SocketVal) waitConnection() (net.Conn, error) { + var ( + connc = make(chan net.Conn, 1) + errc = make(chan error, 1) + ) + + go func(connc chan<- net.Conn, errc chan<- error) { + conn, err := sc.acceptConnection() + if err != nil { + errc <- err + return + } + + connc <- conn + }(connc, errc) + + select { + case conn := <-connc: + return conn, nil + case err := <-errc: + return nil, err + } +} diff --git a/privval/tcp_test.go b/privval/client_test.go similarity index 70% rename from privval/tcp_test.go rename to privval/client_test.go index e893ef40..7fae6bf8 100644 --- a/privval/tcp_test.go +++ b/privval/client_test.go @@ -17,6 +17,16 @@ import ( "github.com/tendermint/tendermint/types" ) +var ( + testAcceptDeadline = defaultAcceptDeadlineSeconds * time.Second + + testConnDeadline = 100 * time.Millisecond + testConnDeadline2o3 = 66 * time.Millisecond // 2/3 of the other one + + testHeartbeatTimeout = 10 * time.Millisecond + testHeartbeatTimeout3o2 = 6 * time.Millisecond // 3/2 of the other one +) + func TestSocketPVAddress(t *testing.T) { var ( chainID = cmn.RandStr(12) @@ -39,8 +49,7 @@ func TestSocketPVPubKey(t *testing.T) { defer sc.Stop() defer rs.Stop() - clientKey, err := sc.getPubKey() - require.NoError(t, err) + clientKey := sc.GetPubKey() privvalPubKey := rs.privVal.GetPubKey() @@ -95,14 +104,14 @@ func TestSocketPVVoteResetDeadline(t *testing.T) { defer sc.Stop() defer rs.Stop() - time.Sleep(3 * time.Millisecond) + time.Sleep(testConnDeadline2o3) require.NoError(t, rs.privVal.SignVote(chainID, want)) require.NoError(t, sc.SignVote(chainID, have)) assert.Equal(t, want.Signature, have.Signature) // This would exceed the deadline if it was not extended by the previous message - time.Sleep(3 * time.Millisecond) + time.Sleep(testConnDeadline2o3) require.NoError(t, rs.privVal.SignVote(chainID, want)) require.NoError(t, sc.SignVote(chainID, have)) @@ -122,7 +131,7 @@ func TestSocketPVVoteKeepalive(t *testing.T) { defer sc.Stop() defer rs.Stop() - time.Sleep(10 * time.Millisecond) + time.Sleep(testConnDeadline * 2) require.NoError(t, rs.privVal.SignVote(chainID, want)) require.NoError(t, sc.SignVote(chainID, have)) @@ -131,18 +140,13 @@ func TestSocketPVVoteKeepalive(t *testing.T) { func TestSocketPVDeadline(t *testing.T) { var ( - addr = testFreeAddr(t) - listenc = make(chan struct{}) - sc = NewTCPVal( - log.TestingLogger(), - addr, - ed25519.GenPrivKey(), - ) + addr = testFreeAddr(t) + listenc = make(chan struct{}) + thisConnTimeout = 100 * time.Millisecond + sc = newSocketVal(log.TestingLogger(), addr, thisConnTimeout) ) - TCPValConnTimeout(100 * time.Millisecond)(sc) - - go func(sc *TCPVal) { + go func(sc *SocketVal) { defer close(listenc) assert.Equal(t, sc.Start().(cmn.Error).Data(), ErrConnTimeout) @@ -199,9 +203,8 @@ func TestRemoteSignerRetry(t *testing.T) { rs := NewRemoteSigner( log.TestingLogger(), cmn.RandStr(12), - ln.Addr().String(), types.NewMockPV(), - ed25519.GenPrivKey(), + DialTCPFn(ln.Addr().String(), testConnDeadline, ed25519.GenPrivKey()), ) defer rs.Stop() @@ -230,15 +233,8 @@ func TestRemoteSignVoteErrors(t *testing.T) { defer sc.Stop() defer rs.Stop() - err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote}) - require.NoError(t, err) - - res, err := readMsg(sc.conn) - require.NoError(t, err) - - resp := *res.(*SignedVoteResponse) - require.NotNil(t, resp.Error) - require.Equal(t, resp.Error.Description, types.ErroringMockPVErr.Error()) + err := sc.SignVote("", vote) + require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error()) err = rs.privVal.SignVote(chainID, vote) require.Error(t, err) @@ -257,15 +253,8 @@ func TestRemoteSignProposalErrors(t *testing.T) { defer sc.Stop() defer rs.Stop() - err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal}) - require.NoError(t, err) - - res, err := readMsg(sc.conn) - require.NoError(t, err) - - resp := *res.(*SignedProposalResponse) - require.NotNil(t, resp.Error) - require.Equal(t, resp.Error.Description, types.ErroringMockPVErr.Error()) + err := sc.SignProposal("", proposal) + require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error()) err = rs.privVal.SignProposal(chainID, proposal) require.Error(t, err) @@ -285,15 +274,10 @@ func TestErrUnexpectedResponse(t *testing.T) { rs = NewRemoteSigner( logger, chainID, - addr, types.NewMockPV(), - ed25519.GenPrivKey(), - ) - sc = NewTCPVal( - logger, - addr, - ed25519.GenPrivKey(), + DialTCPFn(addr, testConnDeadline, ed25519.GenPrivKey()), ) + sc = newSocketVal(logger, addr, testConnDeadline) ) testStartSocketPV(t, readyc, sc) @@ -331,11 +315,73 @@ func TestErrUnexpectedResponse(t *testing.T) { require.Equal(t, err, ErrUnexpectedResponse) } +func TestRetryTCPConnToRemoteSigner(t *testing.T) { + var ( + addr = testFreeAddr(t) + logger = log.TestingLogger() + chainID = cmn.RandStr(12) + readyc = make(chan struct{}) + + rs = NewRemoteSigner( + logger, + chainID, + types.NewMockPV(), + DialTCPFn(addr, testConnDeadline, ed25519.GenPrivKey()), + ) + thisConnTimeout = testConnDeadline + sc = newSocketVal(logger, addr, thisConnTimeout) + ) + // Ping every: + SocketValHeartbeat(testHeartbeatTimeout)(sc) + + RemoteSignerConnDeadline(testConnDeadline)(rs) + RemoteSignerConnRetries(10)(rs) + + testStartSocketPV(t, readyc, sc) + defer sc.Stop() + require.NoError(t, rs.Start()) + assert.True(t, rs.IsRunning()) + + <-readyc + time.Sleep(testHeartbeatTimeout * 2) + + rs.Stop() + rs2 := NewRemoteSigner( + logger, + chainID, + types.NewMockPV(), + DialTCPFn(addr, testConnDeadline, ed25519.GenPrivKey()), + ) + // let some pings pass + time.Sleep(testHeartbeatTimeout3o2) + require.NoError(t, rs2.Start()) + assert.True(t, rs2.IsRunning()) + defer rs2.Stop() + + // give the client some time to re-establish the conn to the remote signer + // should see sth like this in the logs: + // + // E[10016-01-10|17:12:46.128] Ping err="remote signer timed out" + // I[10016-01-10|17:16:42.447] Re-created connection to remote signer impl=SocketVal + time.Sleep(testConnDeadline * 2) +} + +func newSocketVal(logger log.Logger, addr string, connDeadline time.Duration) *SocketVal { + ln, err := net.Listen(cmn.ProtocolAndAddress(addr)) + if err != nil { + panic(err) + } + tcpLn := NewTCPListener(ln, ed25519.GenPrivKey()) + TCPListenerAcceptDeadline(testAcceptDeadline)(tcpLn) + TCPListenerConnDeadline(testConnDeadline)(tcpLn) + return NewSocketVal(logger, tcpLn) +} + func testSetupSocketPair( t *testing.T, chainID string, privValidator types.PrivValidator, -) (*TCPVal, *RemoteSigner) { +) (*SocketVal, *RemoteSigner) { var ( addr = testFreeAddr(t) logger = log.TestingLogger() @@ -344,20 +390,16 @@ func testSetupSocketPair( rs = NewRemoteSigner( logger, chainID, - addr, privVal, - ed25519.GenPrivKey(), - ) - sc = NewTCPVal( - logger, - addr, - ed25519.GenPrivKey(), + DialTCPFn(addr, testConnDeadline, ed25519.GenPrivKey()), ) + + thisConnTimeout = testConnDeadline + sc = newSocketVal(logger, addr, thisConnTimeout) ) - TCPValConnTimeout(5 * time.Millisecond)(sc) - TCPValHeartbeat(2 * time.Millisecond)(sc) - RemoteSignerConnDeadline(5 * time.Millisecond)(rs) + SocketValHeartbeat(testHeartbeatTimeout)(sc) + RemoteSignerConnDeadline(testConnDeadline)(rs) RemoteSignerConnRetries(1e6)(rs) testStartSocketPV(t, readyc, sc) @@ -378,8 +420,8 @@ func testReadWriteResponse(t *testing.T, resp RemoteSignerMsg, rsConn net.Conn) require.NoError(t, err) } -func testStartSocketPV(t *testing.T, readyc chan struct{}, sc *TCPVal) { - go func(sc *TCPVal) { +func testStartSocketPV(t *testing.T, readyc chan struct{}, sc *SocketVal) { + go func(sc *SocketVal) { require.NoError(t, sc.Start()) assert.True(t, sc.IsRunning()) diff --git a/privval/doc.go b/privval/doc.go new file mode 100644 index 00000000..ed378c19 --- /dev/null +++ b/privval/doc.go @@ -0,0 +1,21 @@ +/* + +Package privval provides different implementations of the types.PrivValidator. + +FilePV + +FilePV is the simplest implementation and developer default. It uses one file for the private key and another to store state. + +SocketVal + +SocketVal establishes a connection to an external process, like a Key Management Server (KMS), using a socket. +SocketVal listens for the external KMS process to dial in. +SocketVal takes a listener, which determines the type of connection +(ie. encrypted over tcp, or unencrypted over unix). + +RemoteSigner + +RemoteSigner is a simple wrapper around a net.Conn. It's used by both IPCVal and TCPVal. + +*/ +package privval diff --git a/privval/priv_validator.go b/privval/file.go similarity index 99% rename from privval/priv_validator.go rename to privval/file.go index 1ee5b4d8..8072cfa4 100644 --- a/privval/priv_validator.go +++ b/privval/file.go @@ -22,6 +22,7 @@ const ( stepPrecommit int8 = 3 ) +// A vote is either stepPrevote or stepPrecommit. func voteToStep(vote *types.Vote) int8 { switch vote.Type { case types.PrevoteType: @@ -29,7 +30,7 @@ func voteToStep(vote *types.Vote) int8 { case types.PrecommitType: return stepPrecommit default: - cmn.PanicSanity("Unknown vote type") + panic("Unknown vote type") return 0 } } diff --git a/privval/priv_validator_test.go b/privval/file_test.go similarity index 100% rename from privval/priv_validator_test.go rename to privval/file_test.go diff --git a/privval/ipc.go b/privval/ipc.go deleted file mode 100644 index 1c82db33..00000000 --- a/privval/ipc.go +++ /dev/null @@ -1,123 +0,0 @@ -package privval - -import ( - "net" - "time" - - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/types" -) - -// IPCValOption sets an optional parameter on the SocketPV. -type IPCValOption func(*IPCVal) - -// IPCValConnTimeout sets the read and write timeout for connections -// from external signing processes. -func IPCValConnTimeout(timeout time.Duration) IPCValOption { - return func(sc *IPCVal) { sc.connTimeout = timeout } -} - -// IPCValHeartbeat sets the period on which to check the liveness of the -// connected Signer connections. -func IPCValHeartbeat(period time.Duration) IPCValOption { - return func(sc *IPCVal) { sc.connHeartbeat = period } -} - -// IPCVal implements PrivValidator, it uses a unix socket to request signatures -// from an external process. -type IPCVal struct { - cmn.BaseService - *RemoteSignerClient - - addr string - - connTimeout time.Duration - connHeartbeat time.Duration - - conn net.Conn - cancelPing chan struct{} - pingTicker *time.Ticker -} - -// Check that IPCVal implements PrivValidator. -var _ types.PrivValidator = (*IPCVal)(nil) - -// NewIPCVal returns an instance of IPCVal. -func NewIPCVal( - logger log.Logger, - socketAddr string, -) *IPCVal { - sc := &IPCVal{ - addr: socketAddr, - connTimeout: connTimeout, - connHeartbeat: connHeartbeat, - } - - sc.BaseService = *cmn.NewBaseService(logger, "IPCVal", sc) - - return sc -} - -// OnStart implements cmn.Service. -func (sc *IPCVal) OnStart() error { - err := sc.connect() - if err != nil { - sc.Logger.Error("OnStart", "err", err) - return err - } - - sc.RemoteSignerClient, err = NewRemoteSignerClient(sc.conn) - if err != nil { - return err - } - - // Start a routine to keep the connection alive - sc.cancelPing = make(chan struct{}, 1) - sc.pingTicker = time.NewTicker(sc.connHeartbeat) - go func() { - for { - select { - case <-sc.pingTicker.C: - err := sc.Ping() - if err != nil { - sc.Logger.Error("Ping", "err", err) - } - case <-sc.cancelPing: - sc.pingTicker.Stop() - return - } - } - }() - - return nil -} - -// OnStop implements cmn.Service. -func (sc *IPCVal) OnStop() { - if sc.cancelPing != nil { - close(sc.cancelPing) - } - - if sc.conn != nil { - if err := sc.conn.Close(); err != nil { - sc.Logger.Error("OnStop", "err", err) - } - } -} - -func (sc *IPCVal) connect() error { - la, err := net.ResolveUnixAddr("unix", sc.addr) - if err != nil { - return err - } - - conn, err := net.DialUnix("unix", nil, la) - if err != nil { - return err - } - - sc.conn = newTimeoutConn(conn, sc.connTimeout) - - return nil -} diff --git a/privval/ipc_server.go b/privval/ipc_server.go deleted file mode 100644 index ba957477..00000000 --- a/privval/ipc_server.go +++ /dev/null @@ -1,132 +0,0 @@ -package privval - -import ( - "io" - "net" - "time" - - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/types" -) - -// IPCRemoteSignerOption sets an optional parameter on the IPCRemoteSigner. -type IPCRemoteSignerOption func(*IPCRemoteSigner) - -// IPCRemoteSignerConnDeadline sets the read and write deadline for connections -// from external signing processes. -func IPCRemoteSignerConnDeadline(deadline time.Duration) IPCRemoteSignerOption { - return func(ss *IPCRemoteSigner) { ss.connDeadline = deadline } -} - -// IPCRemoteSignerConnRetries sets the amount of attempted retries to connect. -func IPCRemoteSignerConnRetries(retries int) IPCRemoteSignerOption { - return func(ss *IPCRemoteSigner) { ss.connRetries = retries } -} - -// IPCRemoteSigner is a RPC implementation of PrivValidator that listens on a unix socket. -type IPCRemoteSigner struct { - cmn.BaseService - - addr string - chainID string - connDeadline time.Duration - connRetries int - privVal types.PrivValidator - - listener *net.UnixListener -} - -// NewIPCRemoteSigner returns an instance of IPCRemoteSigner. -func NewIPCRemoteSigner( - logger log.Logger, - chainID, socketAddr string, - privVal types.PrivValidator, -) *IPCRemoteSigner { - rs := &IPCRemoteSigner{ - addr: socketAddr, - chainID: chainID, - connDeadline: time.Second * defaultConnDeadlineSeconds, - connRetries: defaultDialRetries, - privVal: privVal, - } - - rs.BaseService = *cmn.NewBaseService(logger, "IPCRemoteSigner", rs) - - return rs -} - -// OnStart implements cmn.Service. -func (rs *IPCRemoteSigner) OnStart() error { - err := rs.listen() - if err != nil { - err = cmn.ErrorWrap(err, "listen") - rs.Logger.Error("OnStart", "err", err) - return err - } - - go func() { - for { - conn, err := rs.listener.AcceptUnix() - if err != nil { - rs.Logger.Error("AcceptUnix", "err", err) - return - } - go rs.handleConnection(conn) - } - }() - - return nil -} - -// OnStop implements cmn.Service. -func (rs *IPCRemoteSigner) OnStop() { - if rs.listener != nil { - if err := rs.listener.Close(); err != nil { - rs.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed")) - } - } -} - -func (rs *IPCRemoteSigner) listen() error { - la, err := net.ResolveUnixAddr("unix", rs.addr) - if err != nil { - return err - } - - rs.listener, err = net.ListenUnix("unix", la) - - return err -} - -func (rs *IPCRemoteSigner) handleConnection(conn net.Conn) { - for { - if !rs.IsRunning() { - return // Ignore error from listener closing. - } - - // Reset the connection deadline - conn.SetDeadline(time.Now().Add(rs.connDeadline)) - - req, err := readMsg(conn) - if err != nil { - if err != io.EOF { - rs.Logger.Error("handleConnection", "err", err) - } - return - } - - res, err := handleRequest(req, rs.chainID, rs.privVal) - - if err != nil { - // only log the error; we'll reply with an error in res - rs.Logger.Error("handleConnection", "err", err) - } - - err = writeMsg(conn, res) - if err != nil { - rs.Logger.Error("handleConnection", "err", err) - return - } - } -} diff --git a/privval/ipc_test.go b/privval/ipc_test.go deleted file mode 100644 index c8d6dfc7..00000000 --- a/privval/ipc_test.go +++ /dev/null @@ -1,147 +0,0 @@ -package privval - -import ( - "io/ioutil" - "os" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - "github.com/tendermint/tendermint/types" -) - -func TestIPCPVVote(t *testing.T) { - var ( - chainID = cmn.RandStr(12) - sc, rs = testSetupIPCSocketPair(t, chainID, types.NewMockPV()) - - ts = time.Now() - vType = types.PrecommitType - want = &types.Vote{Timestamp: ts, Type: vType} - have = &types.Vote{Timestamp: ts, Type: vType} - ) - defer sc.Stop() - defer rs.Stop() - - require.NoError(t, rs.privVal.SignVote(chainID, want)) - require.NoError(t, sc.SignVote(chainID, have)) - assert.Equal(t, want.Signature, have.Signature) -} - -func TestIPCPVVoteResetDeadline(t *testing.T) { - var ( - chainID = cmn.RandStr(12) - sc, rs = testSetupIPCSocketPair(t, chainID, types.NewMockPV()) - - ts = time.Now() - vType = types.PrecommitType - want = &types.Vote{Timestamp: ts, Type: vType} - have = &types.Vote{Timestamp: ts, Type: vType} - ) - defer sc.Stop() - defer rs.Stop() - - time.Sleep(3 * time.Millisecond) - - require.NoError(t, rs.privVal.SignVote(chainID, want)) - require.NoError(t, sc.SignVote(chainID, have)) - assert.Equal(t, want.Signature, have.Signature) - - // This would exceed the deadline if it was not extended by the previous message - time.Sleep(3 * time.Millisecond) - - require.NoError(t, rs.privVal.SignVote(chainID, want)) - require.NoError(t, sc.SignVote(chainID, have)) - assert.Equal(t, want.Signature, have.Signature) -} - -func TestIPCPVVoteKeepalive(t *testing.T) { - var ( - chainID = cmn.RandStr(12) - sc, rs = testSetupIPCSocketPair(t, chainID, types.NewMockPV()) - - ts = time.Now() - vType = types.PrecommitType - want = &types.Vote{Timestamp: ts, Type: vType} - have = &types.Vote{Timestamp: ts, Type: vType} - ) - defer sc.Stop() - defer rs.Stop() - - time.Sleep(10 * time.Millisecond) - - require.NoError(t, rs.privVal.SignVote(chainID, want)) - require.NoError(t, sc.SignVote(chainID, have)) - assert.Equal(t, want.Signature, have.Signature) -} - -func testSetupIPCSocketPair( - t *testing.T, - chainID string, - privValidator types.PrivValidator, -) (*IPCVal, *IPCRemoteSigner) { - addr, err := testUnixAddr() - require.NoError(t, err) - - var ( - logger = log.TestingLogger() - privVal = privValidator - readyc = make(chan struct{}) - rs = NewIPCRemoteSigner( - logger, - chainID, - addr, - privVal, - ) - sc = NewIPCVal( - logger, - addr, - ) - ) - - IPCValConnTimeout(5 * time.Millisecond)(sc) - IPCValHeartbeat(time.Millisecond)(sc) - - IPCRemoteSignerConnDeadline(time.Millisecond * 5)(rs) - - testStartIPCRemoteSigner(t, readyc, rs) - - <-readyc - - require.NoError(t, sc.Start()) - assert.True(t, sc.IsRunning()) - - return sc, rs -} - -func testStartIPCRemoteSigner(t *testing.T, readyc chan struct{}, rs *IPCRemoteSigner) { - go func(rs *IPCRemoteSigner) { - require.NoError(t, rs.Start()) - assert.True(t, rs.IsRunning()) - - readyc <- struct{}{} - }(rs) -} - -func testUnixAddr() (string, error) { - f, err := ioutil.TempFile("/tmp", "nettest") - if err != nil { - return "", err - } - - addr := f.Name() - err = f.Close() - if err != nil { - return "", err - } - err = os.Remove(addr) - if err != nil { - return "", err - } - - return addr, nil -} diff --git a/privval/old_priv_validator.go b/privval/old_file.go similarity index 100% rename from privval/old_priv_validator.go rename to privval/old_file.go diff --git a/privval/old_priv_validator_test.go b/privval/old_file_test.go similarity index 100% rename from privval/old_priv_validator_test.go rename to privval/old_file_test.go diff --git a/privval/remote_signer.go b/privval/remote_signer.go index b80884de..d928b198 100644 --- a/privval/remote_signer.go +++ b/privval/remote_signer.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "net" - "sync" "github.com/pkg/errors" @@ -14,31 +13,41 @@ import ( "github.com/tendermint/tendermint/types" ) -// RemoteSignerClient implements PrivValidator, it uses a socket to request signatures +// Socket errors. +var ( + ErrConnTimeout = errors.New("remote signer timed out") +) + +// RemoteSignerClient implements PrivValidator. +// It uses a net.Conn to request signatures // from an external process. type RemoteSignerClient struct { - conn net.Conn + conn net.Conn + + // memoized consensusPubKey crypto.PubKey - mtx sync.Mutex } // Check that RemoteSignerClient implements PrivValidator. var _ types.PrivValidator = (*RemoteSignerClient)(nil) // NewRemoteSignerClient returns an instance of RemoteSignerClient. -func NewRemoteSignerClient( - conn net.Conn, -) (*RemoteSignerClient, error) { - sc := &RemoteSignerClient{ - conn: conn, - } - pubKey, err := sc.getPubKey() +func NewRemoteSignerClient(conn net.Conn) (*RemoteSignerClient, error) { + + // retrieve and memoize the consensus public key once. + pubKey, err := getPubKey(conn) if err != nil { return nil, cmn.ErrorWrap(err, "error while retrieving public key for remote signer") } - // retrieve and memoize the consensus public key once: - sc.consensusPubKey = pubKey - return sc, nil + return &RemoteSignerClient{ + conn: conn, + consensusPubKey: pubKey, + }, nil +} + +// Close calls Close on the underlying net.Conn. +func (sc *RemoteSignerClient) Close() error { + return sc.conn.Close() } // GetPubKey implements PrivValidator. @@ -46,16 +55,14 @@ func (sc *RemoteSignerClient) GetPubKey() crypto.PubKey { return sc.consensusPubKey } -func (sc *RemoteSignerClient) getPubKey() (crypto.PubKey, error) { - sc.mtx.Lock() - defer sc.mtx.Unlock() - - err := writeMsg(sc.conn, &PubKeyRequest{}) +// not thread-safe (only called on startup). +func getPubKey(conn net.Conn) (crypto.PubKey, error) { + err := writeMsg(conn, &PubKeyRequest{}) if err != nil { return nil, err } - res, err := readMsg(sc.conn) + res, err := readMsg(conn) if err != nil { return nil, err } @@ -73,9 +80,6 @@ func (sc *RemoteSignerClient) getPubKey() (crypto.PubKey, error) { // SignVote implements PrivValidator. func (sc *RemoteSignerClient) SignVote(chainID string, vote *types.Vote) error { - sc.mtx.Lock() - defer sc.mtx.Unlock() - err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote}) if err != nil { return err @@ -103,9 +107,6 @@ func (sc *RemoteSignerClient) SignProposal( chainID string, proposal *types.Proposal, ) error { - sc.mtx.Lock() - defer sc.mtx.Unlock() - err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal}) if err != nil { return err @@ -129,9 +130,6 @@ func (sc *RemoteSignerClient) SignProposal( // Ping is used to check connection health. func (sc *RemoteSignerClient) Ping() error { - sc.mtx.Lock() - defer sc.mtx.Unlock() - err := writeMsg(sc.conn, &PingRequest{}) if err != nil { return err diff --git a/privval/tcp_server.go b/privval/server.go similarity index 63% rename from privval/tcp_server.go rename to privval/server.go index 694023d7..8b22c69e 100644 --- a/privval/tcp_server.go +++ b/privval/server.go @@ -5,6 +5,7 @@ import ( "net" "time" + "github.com/pkg/errors" "github.com/tendermint/tendermint/crypto/ed25519" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/libs/log" @@ -12,6 +13,11 @@ import ( "github.com/tendermint/tendermint/types" ) +// Socket errors. +var ( + ErrDialRetryMax = errors.New("dialed maximum retries") +) + // RemoteSignerOption sets an optional parameter on the RemoteSigner. type RemoteSignerOption func(*RemoteSigner) @@ -26,38 +32,64 @@ func RemoteSignerConnRetries(retries int) RemoteSignerOption { return func(ss *RemoteSigner) { ss.connRetries = retries } } -// RemoteSigner implements PrivValidator by dialing to a socket. +// RemoteSigner dials using its dialer and responds to any +// signature requests using its privVal. type RemoteSigner struct { cmn.BaseService - addr string chainID string connDeadline time.Duration connRetries int - privKey ed25519.PrivKeyEd25519 privVal types.PrivValidator - conn net.Conn + dialer Dialer + conn net.Conn } -// NewRemoteSigner returns an instance of RemoteSigner. +// Dialer dials a remote address and returns a net.Conn or an error. +type Dialer func() (net.Conn, error) + +// DialTCPFn dials the given tcp addr, using the given connTimeout and privKey for the +// authenticated encryption handshake. +func DialTCPFn(addr string, connTimeout time.Duration, privKey ed25519.PrivKeyEd25519) Dialer { + return func() (net.Conn, error) { + conn, err := cmn.Connect(addr) + if err == nil { + err = conn.SetDeadline(time.Now().Add(connTimeout)) + } + if err == nil { + conn, err = p2pconn.MakeSecretConnection(conn, privKey) + } + return conn, err + } +} + +// DialUnixFn dials the given unix socket. +func DialUnixFn(addr string) Dialer { + return func() (net.Conn, error) { + unixAddr := &net.UnixAddr{addr, "unix"} + return net.DialUnix("unix", nil, unixAddr) + } +} + +// NewRemoteSigner return a RemoteSigner that will dial using the given +// dialer and respond to any signature requests over the connection +// using the given privVal. func NewRemoteSigner( logger log.Logger, - chainID, socketAddr string, + chainID string, privVal types.PrivValidator, - privKey ed25519.PrivKeyEd25519, + dialer Dialer, ) *RemoteSigner { rs := &RemoteSigner{ - addr: socketAddr, chainID: chainID, connDeadline: time.Second * defaultConnDeadlineSeconds, connRetries: defaultDialRetries, - privKey: privKey, privVal: privVal, + dialer: dialer, } rs.BaseService = *cmn.NewBaseService(logger, "RemoteSigner", rs) - return rs } @@ -68,6 +100,7 @@ func (rs *RemoteSigner) OnStart() error { rs.Logger.Error("OnStart", "err", err) return err } + rs.conn = conn go rs.handleConnection(conn) @@ -91,36 +124,11 @@ func (rs *RemoteSigner) connect() (net.Conn, error) { if retries != rs.connRetries { time.Sleep(rs.connDeadline) } - - conn, err := cmn.Connect(rs.addr) + conn, err := rs.dialer() if err != nil { - rs.Logger.Error( - "connect", - "addr", rs.addr, - "err", err, - ) - + rs.Logger.Error("dialing", "err", err) continue } - - if err := conn.SetDeadline(time.Now().Add(connTimeout)); err != nil { - rs.Logger.Error( - "connect", - "err", err, - ) - continue - } - - conn, err = p2pconn.MakeSecretConnection(conn, rs.privKey) - if err != nil { - rs.Logger.Error( - "connect", - "err", err, - ) - - continue - } - return conn, nil } @@ -139,7 +147,7 @@ func (rs *RemoteSigner) handleConnection(conn net.Conn) { req, err := readMsg(conn) if err != nil { if err != io.EOF { - rs.Logger.Error("handleConnection", "err", err) + rs.Logger.Error("handleConnection readMsg", "err", err) } return } @@ -148,12 +156,12 @@ func (rs *RemoteSigner) handleConnection(conn net.Conn) { if err != nil { // only log the error; we'll reply with an error in res - rs.Logger.Error("handleConnection", "err", err) + rs.Logger.Error("handleConnection handleRequest", "err", err) } err = writeMsg(conn, res) if err != nil { - rs.Logger.Error("handleConnection", "err", err) + rs.Logger.Error("handleConnection writeMsg", "err", err) return } } diff --git a/privval/socket.go b/privval/socket.go new file mode 100644 index 00000000..96fa6c8e --- /dev/null +++ b/privval/socket.go @@ -0,0 +1,184 @@ +package privval + +import ( + "net" + "time" + + "github.com/tendermint/tendermint/crypto/ed25519" + p2pconn "github.com/tendermint/tendermint/p2p/conn" +) + +const ( + defaultAcceptDeadlineSeconds = 3 + defaultConnDeadlineSeconds = 3 +) + +// timeoutError can be used to check if an error returned from the netp package +// was due to a timeout. +type timeoutError interface { + Timeout() bool +} + +//------------------------------------------------------------------ +// TCP Listener + +// TCPListenerOption sets an optional parameter on the tcpListener. +type TCPListenerOption func(*tcpListener) + +// TCPListenerAcceptDeadline sets the deadline for the listener. +// A zero time value disables the deadline. +func TCPListenerAcceptDeadline(deadline time.Duration) TCPListenerOption { + return func(tl *tcpListener) { tl.acceptDeadline = deadline } +} + +// TCPListenerConnDeadline sets the read and write deadline for connections +// from external signing processes. +func TCPListenerConnDeadline(deadline time.Duration) TCPListenerOption { + return func(tl *tcpListener) { tl.connDeadline = deadline } +} + +// tcpListener implements net.Listener. +var _ net.Listener = (*tcpListener)(nil) + +// tcpListener wraps a *net.TCPListener to standardise protocol timeouts +// and potentially other tuning parameters. It also returns encrypted connections. +type tcpListener struct { + *net.TCPListener + + secretConnKey ed25519.PrivKeyEd25519 + + acceptDeadline time.Duration + connDeadline time.Duration +} + +// NewTCPListener returns a listener that accepts authenticated encrypted connections +// using the given secretConnKey and the default timeout values. +func NewTCPListener(ln net.Listener, secretConnKey ed25519.PrivKeyEd25519) *tcpListener { + return &tcpListener{ + TCPListener: ln.(*net.TCPListener), + secretConnKey: secretConnKey, + acceptDeadline: time.Second * defaultAcceptDeadlineSeconds, + connDeadline: time.Second * defaultConnDeadlineSeconds, + } +} + +// Accept implements net.Listener. +func (ln *tcpListener) Accept() (net.Conn, error) { + err := ln.SetDeadline(time.Now().Add(ln.acceptDeadline)) + if err != nil { + return nil, err + } + + tc, err := ln.AcceptTCP() + if err != nil { + return nil, err + } + + // Wrap the conn in our timeout and encryption wrappers + timeoutConn := newTimeoutConn(tc, ln.connDeadline) + secretConn, err := p2pconn.MakeSecretConnection(timeoutConn, ln.secretConnKey) + if err != nil { + return nil, err + } + + return secretConn, nil +} + +//------------------------------------------------------------------ +// Unix Listener + +// unixListener implements net.Listener. +var _ net.Listener = (*unixListener)(nil) + +type UnixListenerOption func(*unixListener) + +// UnixListenerAcceptDeadline sets the deadline for the listener. +// A zero time value disables the deadline. +func UnixListenerAcceptDeadline(deadline time.Duration) UnixListenerOption { + return func(ul *unixListener) { ul.acceptDeadline = deadline } +} + +// UnixListenerConnDeadline sets the read and write deadline for connections +// from external signing processes. +func UnixListenerConnDeadline(deadline time.Duration) UnixListenerOption { + return func(ul *unixListener) { ul.connDeadline = deadline } +} + +// unixListener wraps a *net.UnixListener to standardise protocol timeouts +// and potentially other tuning parameters. It returns unencrypted connections. +type unixListener struct { + *net.UnixListener + + acceptDeadline time.Duration + connDeadline time.Duration +} + +// NewUnixListener returns a listener that accepts unencrypted connections +// using the default timeout values. +func NewUnixListener(ln net.Listener) *unixListener { + return &unixListener{ + UnixListener: ln.(*net.UnixListener), + acceptDeadline: time.Second * defaultAcceptDeadlineSeconds, + connDeadline: time.Second * defaultConnDeadlineSeconds, + } +} + +// Accept implements net.Listener. +func (ln *unixListener) Accept() (net.Conn, error) { + err := ln.SetDeadline(time.Now().Add(ln.acceptDeadline)) + if err != nil { + return nil, err + } + + tc, err := ln.AcceptUnix() + if err != nil { + return nil, err + } + + // Wrap the conn in our timeout wrapper + conn := newTimeoutConn(tc, ln.connDeadline) + + // TODO: wrap in something that authenticates + // with a MAC - https://github.com/tendermint/tendermint/issues/3099 + + return conn, nil +} + +//------------------------------------------------------------------ +// Connection + +// timeoutConn implements net.Conn. +var _ net.Conn = (*timeoutConn)(nil) + +// timeoutConn wraps a net.Conn to standardise protocol timeouts / deadline resets. +type timeoutConn struct { + net.Conn + + connDeadline time.Duration +} + +// newTimeoutConn returns an instance of newTCPTimeoutConn. +func newTimeoutConn( + conn net.Conn, + connDeadline time.Duration) *timeoutConn { + return &timeoutConn{ + conn, + connDeadline, + } +} + +// Read implements net.Conn. +func (c timeoutConn) Read(b []byte) (n int, err error) { + // Reset deadline + c.Conn.SetReadDeadline(time.Now().Add(c.connDeadline)) + + return c.Conn.Read(b) +} + +// Write implements net.Conn. +func (c timeoutConn) Write(b []byte) (n int, err error) { + // Reset deadline + c.Conn.SetWriteDeadline(time.Now().Add(c.connDeadline)) + + return c.Conn.Write(b) +} diff --git a/privval/tcp_socket_test.go b/privval/socket_test.go similarity index 52% rename from privval/tcp_socket_test.go rename to privval/socket_test.go index 285e73ed..0c05fa3a 100644 --- a/privval/tcp_socket_test.go +++ b/privval/socket_test.go @@ -4,17 +4,31 @@ import ( "net" "testing" "time" + + "github.com/tendermint/tendermint/crypto/ed25519" ) -func TestTCPTimeoutListenerAcceptDeadline(t *testing.T) { +//------------------------------------------- +// helper funcs + +func newPrivKey() ed25519.PrivKeyEd25519 { + return ed25519.GenPrivKey() +} + +//------------------------------------------- +// tests + +func TestTCPListenerAcceptDeadline(t *testing.T) { ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) } - ln = newTCPTimeoutListener(ln, time.Millisecond, time.Second, time.Second) + tcpLn := NewTCPListener(ln, newPrivKey()) + TCPListenerAcceptDeadline(time.Millisecond)(tcpLn) + TCPListenerConnDeadline(time.Second)(tcpLn) - _, err = ln.Accept() + _, err = tcpLn.Accept() opErr, ok := err.(*net.OpError) if !ok { t.Fatalf("have %v, want *net.OpError", err) @@ -25,14 +39,17 @@ func TestTCPTimeoutListenerAcceptDeadline(t *testing.T) { } } -func TestTCPTimeoutListenerConnDeadline(t *testing.T) { +func TestTCPListenerConnDeadline(t *testing.T) { ln, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { t.Fatal(err) } - ln = newTCPTimeoutListener(ln, time.Second, time.Millisecond, time.Second) + tcpLn := NewTCPListener(ln, newPrivKey()) + TCPListenerAcceptDeadline(time.Second)(tcpLn) + TCPListenerConnDeadline(time.Millisecond)(tcpLn) + readyc := make(chan struct{}) donec := make(chan struct{}) go func(ln net.Listener) { defer close(donec) @@ -41,6 +58,7 @@ func TestTCPTimeoutListenerConnDeadline(t *testing.T) { if err != nil { t.Fatal(err) } + <-readyc time.Sleep(2 * time.Millisecond) @@ -54,12 +72,13 @@ func TestTCPTimeoutListenerConnDeadline(t *testing.T) { if have, want := opErr.Op, "read"; have != want { t.Errorf("have %v, want %v", have, want) } - }(ln) + }(tcpLn) - _, err = net.Dial("tcp", ln.Addr().String()) + dialer := DialTCPFn(ln.Addr().String(), testConnDeadline, newPrivKey()) + _, err = dialer() if err != nil { t.Fatal(err) } - + close(readyc) <-donec } diff --git a/privval/tcp.go b/privval/tcp.go deleted file mode 100644 index 1fb736e6..00000000 --- a/privval/tcp.go +++ /dev/null @@ -1,216 +0,0 @@ -package privval - -import ( - "errors" - "net" - "time" - - "github.com/tendermint/tendermint/crypto/ed25519" - cmn "github.com/tendermint/tendermint/libs/common" - "github.com/tendermint/tendermint/libs/log" - p2pconn "github.com/tendermint/tendermint/p2p/conn" - "github.com/tendermint/tendermint/types" -) - -const ( - defaultAcceptDeadlineSeconds = 3 - defaultConnDeadlineSeconds = 3 - defaultConnHeartBeatSeconds = 2 - defaultDialRetries = 10 -) - -// Socket errors. -var ( - ErrDialRetryMax = errors.New("dialed maximum retries") - ErrConnTimeout = errors.New("remote signer timed out") - ErrUnexpectedResponse = errors.New("received unexpected response") -) - -var ( - acceptDeadline = time.Second * defaultAcceptDeadlineSeconds - connTimeout = time.Second * defaultConnDeadlineSeconds - connHeartbeat = time.Second * defaultConnHeartBeatSeconds -) - -// TCPValOption sets an optional parameter on the SocketPV. -type TCPValOption func(*TCPVal) - -// TCPValAcceptDeadline sets the deadline for the TCPVal listener. -// A zero time value disables the deadline. -func TCPValAcceptDeadline(deadline time.Duration) TCPValOption { - return func(sc *TCPVal) { sc.acceptDeadline = deadline } -} - -// TCPValConnTimeout sets the read and write timeout for connections -// from external signing processes. -func TCPValConnTimeout(timeout time.Duration) TCPValOption { - return func(sc *TCPVal) { sc.connTimeout = timeout } -} - -// TCPValHeartbeat sets the period on which to check the liveness of the -// connected Signer connections. -func TCPValHeartbeat(period time.Duration) TCPValOption { - return func(sc *TCPVal) { sc.connHeartbeat = period } -} - -// TCPVal implements PrivValidator, it uses a socket to request signatures -// from an external process. -type TCPVal struct { - cmn.BaseService - *RemoteSignerClient - - addr string - acceptDeadline time.Duration - connTimeout time.Duration - connHeartbeat time.Duration - privKey ed25519.PrivKeyEd25519 - - conn net.Conn - listener net.Listener - cancelPing chan struct{} - pingTicker *time.Ticker -} - -// Check that TCPVal implements PrivValidator. -var _ types.PrivValidator = (*TCPVal)(nil) - -// NewTCPVal returns an instance of TCPVal. -func NewTCPVal( - logger log.Logger, - socketAddr string, - privKey ed25519.PrivKeyEd25519, -) *TCPVal { - sc := &TCPVal{ - addr: socketAddr, - acceptDeadline: acceptDeadline, - connTimeout: connTimeout, - connHeartbeat: connHeartbeat, - privKey: privKey, - } - - sc.BaseService = *cmn.NewBaseService(logger, "TCPVal", sc) - - return sc -} - -// OnStart implements cmn.Service. -func (sc *TCPVal) OnStart() error { - if err := sc.listen(); err != nil { - sc.Logger.Error("OnStart", "err", err) - return err - } - - conn, err := sc.waitConnection() - if err != nil { - sc.Logger.Error("OnStart", "err", err) - return err - } - - sc.conn = conn - sc.RemoteSignerClient, err = NewRemoteSignerClient(sc.conn) - if err != nil { - return err - } - - // Start a routine to keep the connection alive - sc.cancelPing = make(chan struct{}, 1) - sc.pingTicker = time.NewTicker(sc.connHeartbeat) - go func() { - for { - select { - case <-sc.pingTicker.C: - err := sc.Ping() - if err != nil { - sc.Logger.Error( - "Ping", - "err", err, - ) - } - case <-sc.cancelPing: - sc.pingTicker.Stop() - return - } - } - }() - - return nil -} - -// OnStop implements cmn.Service. -func (sc *TCPVal) OnStop() { - if sc.cancelPing != nil { - close(sc.cancelPing) - } - - if sc.conn != nil { - if err := sc.conn.Close(); err != nil { - sc.Logger.Error("OnStop", "err", err) - } - } - - if sc.listener != nil { - if err := sc.listener.Close(); err != nil { - sc.Logger.Error("OnStop", "err", err) - } - } -} - -func (sc *TCPVal) acceptConnection() (net.Conn, error) { - conn, err := sc.listener.Accept() - if err != nil { - if !sc.IsRunning() { - return nil, nil // Ignore error from listener closing. - } - return nil, err - - } - - conn, err = p2pconn.MakeSecretConnection(conn, sc.privKey) - if err != nil { - return nil, err - } - - return conn, nil -} - -func (sc *TCPVal) listen() error { - ln, err := net.Listen(cmn.ProtocolAndAddress(sc.addr)) - if err != nil { - return err - } - - sc.listener = newTCPTimeoutListener( - ln, - sc.acceptDeadline, - sc.connTimeout, - sc.connHeartbeat, - ) - - return nil -} - -// waitConnection uses the configured wait timeout to error if no external -// process connects in the time period. -func (sc *TCPVal) waitConnection() (net.Conn, error) { - var ( - connc = make(chan net.Conn, 1) - errc = make(chan error, 1) - ) - - go func(connc chan<- net.Conn, errc chan<- error) { - conn, err := sc.acceptConnection() - if err != nil { - errc <- err - return - } - - connc <- conn - }(connc, errc) - - select { - case conn := <-connc: - return conn, nil - case err := <-errc: - return nil, err - } -} diff --git a/privval/tcp_socket.go b/privval/tcp_socket.go deleted file mode 100644 index 2b17bf26..00000000 --- a/privval/tcp_socket.go +++ /dev/null @@ -1,90 +0,0 @@ -package privval - -import ( - "net" - "time" -) - -// timeoutError can be used to check if an error returned from the netp package -// was due to a timeout. -type timeoutError interface { - Timeout() bool -} - -// tcpTimeoutListener implements net.Listener. -var _ net.Listener = (*tcpTimeoutListener)(nil) - -// tcpTimeoutListener wraps a *net.TCPListener to standardise protocol timeouts -// and potentially other tuning parameters. -type tcpTimeoutListener struct { - *net.TCPListener - - acceptDeadline time.Duration - connDeadline time.Duration - period time.Duration -} - -// timeoutConn wraps a net.Conn to standardise protocol timeouts / deadline resets. -type timeoutConn struct { - net.Conn - - connDeadline time.Duration -} - -// newTCPTimeoutListener returns an instance of tcpTimeoutListener. -func newTCPTimeoutListener( - ln net.Listener, - acceptDeadline, connDeadline time.Duration, - period time.Duration, -) tcpTimeoutListener { - return tcpTimeoutListener{ - TCPListener: ln.(*net.TCPListener), - acceptDeadline: acceptDeadline, - connDeadline: connDeadline, - period: period, - } -} - -// newTimeoutConn returns an instance of newTCPTimeoutConn. -func newTimeoutConn( - conn net.Conn, - connDeadline time.Duration) *timeoutConn { - return &timeoutConn{ - conn, - connDeadline, - } -} - -// Accept implements net.Listener. -func (ln tcpTimeoutListener) Accept() (net.Conn, error) { - err := ln.SetDeadline(time.Now().Add(ln.acceptDeadline)) - if err != nil { - return nil, err - } - - tc, err := ln.AcceptTCP() - if err != nil { - return nil, err - } - - // Wrap the conn in our timeout wrapper - conn := newTimeoutConn(tc, ln.connDeadline) - - return conn, nil -} - -// Read implements net.Listener. -func (c timeoutConn) Read(b []byte) (n int, err error) { - // Reset deadline - c.Conn.SetReadDeadline(time.Now().Add(c.connDeadline)) - - return c.Conn.Read(b) -} - -// Write implements net.Listener. -func (c timeoutConn) Write(b []byte) (n int, err error) { - // Reset deadline - c.Conn.SetWriteDeadline(time.Now().Add(c.connDeadline)) - - return c.Conn.Write(b) -}