mirror of
https://github.com/fluencelabs/tendermint
synced 2025-07-30 19:51:58 +00:00
Compare commits
7 Commits
v0.28.0-de
...
breaking
Author | SHA1 | Date | |
---|---|---|---|
|
1895cde590 | ||
|
be00cd1add | ||
|
a6011c007d | ||
|
ef94a322b8 | ||
|
7f607d0ce2 | ||
|
81c51cd4fc | ||
|
51094f9417 |
@@ -1,4 +1,4 @@
|
||||
## v0.27.4
|
||||
## v0.28.0
|
||||
|
||||
*TBD*
|
||||
|
||||
@@ -10,27 +10,31 @@ Special thanks to external contributors on this release:
|
||||
- [cli] Removed `node` `--proxy_app=dummy` option. Use `kvstore` (`persistent_kvstore`) instead.
|
||||
- [cli] Renamed `node` `--proxy_app=nilapp` to `--proxy_app=noop`.
|
||||
- [config] \#2992 `allow_duplicate_ip` is now set to false
|
||||
|
||||
- [privval] \#2926 split up `PubKeyMsg` into `PubKeyRequest` and `PubKeyResponse` to be consistent with other message types
|
||||
- [privval] \#2923 listen for unix socket connections instead of dialing them
|
||||
|
||||
* Apps
|
||||
|
||||
* Go API
|
||||
- [types] \#2926 memoize consensus public key on initialization of remote signer and return the memoized key on
|
||||
`PrivValidator.GetPubKey()` instead of requesting it again
|
||||
* Go API
|
||||
- [types] \#2926 memoize consensus public key on initialization of remote signer and return the memoized key on
|
||||
`PrivValidator.GetPubKey()` instead of requesting it again
|
||||
- [types] \#2981 Remove `PrivValidator.GetAddress()`
|
||||
|
||||
* Blockchain Protocol
|
||||
|
||||
* P2P Protocol
|
||||
- multiple connections from the same IP are now disabled by default (see `allow_duplicate_ip` config option)
|
||||
|
||||
### FEATURES:
|
||||
- [privval] \#1181 Split immutable and mutable parts of priv_validator.json
|
||||
- [privval] \#1181 Split immutable and mutable parts of `priv_validator.json`
|
||||
|
||||
### IMPROVEMENTS:
|
||||
- [p2p/conn] \#3111 make SecretConnection thread safe
|
||||
- [privval] \#2923 retry RemoteSigner connections on error
|
||||
- [rpc] \#3047 Include peer's remote IP in `/net_info`
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [types] \#2926 do not panic if retrieving the private validator's public key fails
|
||||
- [rpc] \#3080 check if the variable "skipCount" is bigger than zero. If it is not, we set it to 0. If it, we do not do anything.
|
||||
- [crypto/multisig] \#3102 fix multisig keys address length
|
||||
- [crypto/encoding] \#3101 Fix `PubKeyMultisigThreshold` unmarshalling into `crypto.PubKey` interface
|
||||
|
93
README.md
93
README.md
@@ -1,8 +1,8 @@
|
||||
# Tendermint
|
||||
|
||||
[Byzantine-Fault Tolerant](https://en.wikipedia.org/wiki/Byzantine_fault_tolerance)
|
||||
[State Machine Replication](https://en.wikipedia.org/wiki/State_machine_replication).
|
||||
Or [Blockchain](https://en.wikipedia.org/wiki/Blockchain_(database)) for short.
|
||||
[State Machines](https://en.wikipedia.org/wiki/State_machine_replication).
|
||||
Or [Blockchain](https://en.wikipedia.org/wiki/Blockchain_(database)), for short.
|
||||
|
||||
[](https://github.com/tendermint/tendermint/releases/latest)
|
||||
[
|
||||
- [Remote cluster using terraform and ansible](/docs/networks/terraform-and-ansible.md)
|
||||
- [Join the Cosmos testnet](https://cosmos.network/testnet)
|
||||
|
||||
## Resources
|
||||
|
||||
### Tendermint Core
|
||||
|
||||
For details about the blockchain data structures and the p2p protocols, see the
|
||||
the [Tendermint specification](/docs/spec).
|
||||
|
||||
For details on using the software, see the [documentation](/docs/) which is also
|
||||
hosted at: https://tendermint.com/docs/
|
||||
|
||||
### Tools
|
||||
|
||||
Benchmarking and monitoring is provided by `tm-bench` and `tm-monitor`, respectively.
|
||||
Their code is found [here](/tools) and these binaries need to be built seperately.
|
||||
Additional documentation is found [here](/docs/tools).
|
||||
|
||||
### Sub-projects
|
||||
|
||||
* [Amino](http://github.com/tendermint/go-amino), a reflection-based improvement on proto3
|
||||
* [IAVL](http://github.com/tendermint/iavl), Merkleized IAVL+ Tree implementation
|
||||
|
||||
### Applications
|
||||
|
||||
* [Cosmos SDK](http://github.com/cosmos/cosmos-sdk); a cryptocurrency application framework
|
||||
* [Ethermint](http://github.com/cosmos/ethermint); Ethereum on Tendermint
|
||||
* [Many more](https://tendermint.com/ecosystem)
|
||||
|
||||
### Research
|
||||
|
||||
* [The latest gossip on BFT consensus](https://arxiv.org/abs/1807.04938)
|
||||
* [Master's Thesis on Tendermint](https://atrium.lib.uoguelph.ca/xmlui/handle/10214/9769)
|
||||
* [Original Whitepaper](https://tendermint.com/static/docs/tendermint.pdf)
|
||||
* [Blog](https://blog.cosmos.network/tendermint/home)
|
||||
|
||||
## Contributing
|
||||
|
||||
Yay open source! Please see our [contributing guidelines](CONTRIBUTING.md).
|
||||
Please abide by the [Code of Conduct](CODE_OF_CONDUCT.md) in all interactions,
|
||||
and the [contributing guidelines](CONTRIBUTING.md) when submitting code.
|
||||
|
||||
Join the larger community on the [forum](https://forum.cosmos.network/) and the [chat](https://riot.im/app/#/room/#tendermint:matrix.org).
|
||||
|
||||
To learn more about the structure of the software, watch the [Developer
|
||||
Sessions](https://www.youtube.com/playlist?list=PLdQIb0qr3pnBbG5ZG-0gr3zM86_s8Rpqv)
|
||||
and read some [Architectural
|
||||
Decision Records](https://github.com/tendermint/tendermint/tree/master/docs/architecture).
|
||||
|
||||
Learn more by reading the code and comparing it to the
|
||||
[specification](https://github.com/tendermint/tendermint/tree/develop/docs/spec).
|
||||
|
||||
## Versioning
|
||||
|
||||
### SemVer
|
||||
### Semantic Versioning
|
||||
|
||||
Tendermint uses [SemVer](http://semver.org/) to determine when and how the version changes.
|
||||
Tendermint uses [Semantic Versioning](http://semver.org/) to determine when and how the version changes.
|
||||
According to SemVer, anything in the public API can change at any time before version 1.0.0
|
||||
|
||||
To provide some stability to Tendermint users in these 0.X.X days, the MINOR version is used
|
||||
@@ -145,8 +122,40 @@ data into the new chain.
|
||||
However, any bump in the PATCH version should be compatible with existing histories
|
||||
(if not please open an [issue](https://github.com/tendermint/tendermint/issues)).
|
||||
|
||||
For more information on upgrading, see [here](./UPGRADING.md)
|
||||
For more information on upgrading, see [UPGRADING.md](./UPGRADING.md)
|
||||
|
||||
## Code of Conduct
|
||||
## Resources
|
||||
|
||||
### Tendermint Core
|
||||
|
||||
For details about the blockchain data structures and the p2p protocols, see the
|
||||
[Tendermint specification](/docs/spec).
|
||||
|
||||
For details on using the software, see the [documentation](/docs/) which is also
|
||||
hosted at: https://tendermint.com/docs/
|
||||
|
||||
### Tools
|
||||
|
||||
Benchmarking and monitoring is provided by `tm-bench` and `tm-monitor`, respectively.
|
||||
Their code is found [here](/tools) and these binaries need to be built seperately.
|
||||
Additional documentation is found [here](/docs/tools).
|
||||
|
||||
### Sub-projects
|
||||
|
||||
* [Amino](http://github.com/tendermint/go-amino), reflection-based proto3, with
|
||||
interfaces
|
||||
* [IAVL](http://github.com/tendermint/iavl), Merkleized IAVL+ Tree implementation
|
||||
|
||||
### Applications
|
||||
|
||||
* [Cosmos SDK](http://github.com/cosmos/cosmos-sdk); a cryptocurrency application framework
|
||||
* [Ethermint](http://github.com/cosmos/ethermint); Ethereum on Tendermint
|
||||
* [Many more](https://tendermint.com/ecosystem)
|
||||
|
||||
### Research
|
||||
|
||||
* [The latest gossip on BFT consensus](https://arxiv.org/abs/1807.04938)
|
||||
* [Master's Thesis on Tendermint](https://atrium.lib.uoguelph.ca/xmlui/handle/10214/9769)
|
||||
* [Original Whitepaper](https://tendermint.com/static/docs/tendermint.pdf)
|
||||
* [Blog](https://blog.cosmos.network/tendermint/home)
|
||||
|
||||
Please read, understand and adhere to our [code of conduct](CODE_OF_CONDUCT.md).
|
||||
|
@@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"flag"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
@@ -34,13 +35,20 @@ func main() {
|
||||
|
||||
pv := privval.LoadFilePV(*privValKeyPath, *privValStatePath)
|
||||
|
||||
rs := privval.NewRemoteSigner(
|
||||
logger,
|
||||
*chainID,
|
||||
*addr,
|
||||
pv,
|
||||
ed25519.GenPrivKey(),
|
||||
)
|
||||
var dialer privval.Dialer
|
||||
protocol, address := cmn.ProtocolAndAddress(*addr)
|
||||
switch protocol {
|
||||
case "unix":
|
||||
dialer = privval.DialUnixFn(address)
|
||||
case "tcp":
|
||||
connTimeout := 3 * time.Second // TODO
|
||||
dialer = privval.DialTCPFn(address, connTimeout, ed25519.GenPrivKey())
|
||||
default:
|
||||
logger.Error("Unknown protocol", "protocol", protocol)
|
||||
return
|
||||
}
|
||||
|
||||
rs := privval.NewRemoteSigner(logger, *chainID, pv, dialer)
|
||||
err := rs.Start()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
|
@@ -126,6 +126,312 @@ func TestConsensusXXX(t *testing.T) {
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## Consensus Executor
|
||||
|
||||
## Consensus Core
|
||||
|
||||
```go
|
||||
type Event interface{}
|
||||
|
||||
type EventNewHeight struct {
|
||||
Height int64
|
||||
ValidatorId int
|
||||
}
|
||||
|
||||
type EventNewRound HeightAndRound
|
||||
|
||||
type EventProposal struct {
|
||||
Height int64
|
||||
Round int
|
||||
Timestamp Time
|
||||
BlockID BlockID
|
||||
POLRound int
|
||||
Sender int
|
||||
}
|
||||
|
||||
type Majority23PrevotesBlock struct {
|
||||
Height int64
|
||||
Round int
|
||||
BlockID BlockID
|
||||
}
|
||||
|
||||
type Majority23PrecommitBlock struct {
|
||||
Height int64
|
||||
Round int
|
||||
BlockID BlockID
|
||||
}
|
||||
|
||||
type HeightAndRound struct {
|
||||
Height int64
|
||||
Round int
|
||||
}
|
||||
|
||||
type Majority23PrevotesAny HeightAndRound
|
||||
type Majority23PrecommitAny HeightAndRound
|
||||
type TimeoutPropose HeightAndRound
|
||||
type TimeoutPrevotes HeightAndRound
|
||||
type TimeoutPrecommit HeightAndRound
|
||||
|
||||
|
||||
type Message interface{}
|
||||
|
||||
type MessageProposal struct {
|
||||
Height int64
|
||||
Round int
|
||||
BlockID BlockID
|
||||
POLRound int
|
||||
}
|
||||
|
||||
type VoteType int
|
||||
|
||||
const (
|
||||
VoteTypeUnknown VoteType = iota
|
||||
Prevote
|
||||
Precommit
|
||||
)
|
||||
|
||||
|
||||
type MessageVote struct {
|
||||
Height int64
|
||||
Round int
|
||||
BlockID BlockID
|
||||
Type VoteType
|
||||
}
|
||||
|
||||
|
||||
type MessageDecision struct {
|
||||
Height int64
|
||||
Round int
|
||||
BlockID BlockID
|
||||
}
|
||||
|
||||
type TriggerTimeout struct {
|
||||
Height int64
|
||||
Round int
|
||||
Duration Duration
|
||||
}
|
||||
|
||||
|
||||
type RoundStep int
|
||||
|
||||
const (
|
||||
RoundStepUnknown RoundStep = iota
|
||||
RoundStepPropose
|
||||
RoundStepPrevote
|
||||
RoundStepPrecommit
|
||||
RoundStepCommit
|
||||
)
|
||||
|
||||
type State struct {
|
||||
Height int64
|
||||
Round int
|
||||
Step RoundStep
|
||||
LockedValue BlockID
|
||||
LockedRound int
|
||||
ValidValue BlockID
|
||||
ValidRound int
|
||||
ValidatorId int
|
||||
ValidatorSetSize int
|
||||
}
|
||||
|
||||
func proposer(height int64, round int) int {}
|
||||
func getValue() BlockID {}
|
||||
|
||||
func Consensus(event Event, state State) (State, Message, TriggerTimeout) {
|
||||
msg = nil
|
||||
timeout = nil
|
||||
switch event := event.(type) {
|
||||
case EventNewHeight:
|
||||
if event.Height > state.Height {
|
||||
state.Height = event.Height
|
||||
state.Round = -1
|
||||
state.Step = RoundStepPropose
|
||||
state.LockedValue = nil
|
||||
state.LockedRound = -1
|
||||
state.ValidValue = nil
|
||||
state.ValidRound = -1
|
||||
state.ValidatorId = event.ValidatorId
|
||||
}
|
||||
return state, msg, timeout
|
||||
|
||||
case EventNewRound:
|
||||
if event.Height == state.Height and event.Round > state.Round {
|
||||
state.Round = eventRound
|
||||
state.Step = RoundStepPropose
|
||||
if proposer(state.Height, state.Round) == state.ValidatorId {
|
||||
proposal = state.ValidValue
|
||||
if proposal == nil {
|
||||
proposal = getValue()
|
||||
}
|
||||
msg = MessageProposal { state.Height, state.Round, proposal, state.ValidRound }
|
||||
}
|
||||
timeout = TriggerTimeout { state.Height, state.Round, timeoutPropose(state.Round) }
|
||||
}
|
||||
return state, msg, timeout
|
||||
|
||||
case EventProposal:
|
||||
if event.Height == state.Height and event.Round == state.Round and
|
||||
event.Sender == proposal(state.Height, state.Round) and state.Step == RoundStepPropose {
|
||||
if event.POLRound >= state.LockedRound or event.BlockID == state.BlockID or state.LockedRound == -1 {
|
||||
msg = MessageVote { state.Height, state.Round, event.BlockID, Prevote }
|
||||
}
|
||||
state.Step = RoundStepPrevote
|
||||
}
|
||||
return state, msg, timeout
|
||||
|
||||
case TimeoutPropose:
|
||||
if event.Height == state.Height and event.Round == state.Round and state.Step == RoundStepPropose {
|
||||
msg = MessageVote { state.Height, state.Round, nil, Prevote }
|
||||
state.Step = RoundStepPrevote
|
||||
}
|
||||
return state, msg, timeout
|
||||
|
||||
case Majority23PrevotesBlock:
|
||||
if event.Height == state.Height and event.Round == state.Round and state.Step >= RoundStepPrevote and event.Round > state.ValidRound {
|
||||
state.ValidRound = event.Round
|
||||
state.ValidValue = event.BlockID
|
||||
if state.Step == RoundStepPrevote {
|
||||
state.LockedRound = event.Round
|
||||
state.LockedValue = event.BlockID
|
||||
msg = MessageVote { state.Height, state.Round, event.BlockID, Precommit }
|
||||
state.Step = RoundStepPrecommit
|
||||
}
|
||||
}
|
||||
return state, msg, timeout
|
||||
|
||||
case Majority23PrevotesAny:
|
||||
if event.Height == state.Height and event.Round == state.Round and state.Step == RoundStepPrevote {
|
||||
timeout = TriggerTimeout { state.Height, state.Round, timeoutPrevote(state.Round) }
|
||||
}
|
||||
return state, msg, timeout
|
||||
|
||||
case TimeoutPrevote:
|
||||
if event.Height == state.Height and event.Round == state.Round and state.Step == RoundStepPrevote {
|
||||
msg = MessageVote { state.Height, state.Round, nil, Precommit }
|
||||
state.Step = RoundStepPrecommit
|
||||
}
|
||||
return state, msg, timeout
|
||||
|
||||
case Majority23PrecommitBlock:
|
||||
if event.Height == state.Height {
|
||||
state.Step = RoundStepCommit
|
||||
state.LockedValue = event.BlockID
|
||||
}
|
||||
return state, msg, timeout
|
||||
|
||||
case Majority23PrecommitAny:
|
||||
if event.Height == state.Height and event.Round == state.Round {
|
||||
timeout = TriggerTimeout { state.Height, state.Round, timeoutPrecommit(state.Round) }
|
||||
}
|
||||
return state, msg, timeout
|
||||
|
||||
case TimeoutPrecommit:
|
||||
if event.Height == state.Height and event.Round == state.Round {
|
||||
state.Round = state.Round + 1
|
||||
}
|
||||
return state, msg, timeout
|
||||
}
|
||||
}
|
||||
|
||||
func ConsensusExecutor() {
|
||||
proposal = nil
|
||||
votes = HeightVoteSet { Height: 1 }
|
||||
state = State {
|
||||
Height: 1
|
||||
Round: 0
|
||||
Step: RoundStepPropose
|
||||
LockedValue: nil
|
||||
LockedRound: -1
|
||||
ValidValue: nil
|
||||
ValidRound: -1
|
||||
}
|
||||
|
||||
event = EventNewHeight {1, id}
|
||||
state, msg, timeout = Consensus(event, state)
|
||||
|
||||
event = EventNewRound {state.Height, 0}
|
||||
state, msg, timeout = Consensus(event, state)
|
||||
|
||||
if msg != nil {
|
||||
send msg
|
||||
}
|
||||
|
||||
if timeout != nil {
|
||||
trigger timeout
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case message := <- msgCh:
|
||||
switch msg := message.(type) {
|
||||
case MessageProposal:
|
||||
|
||||
case MessageVote:
|
||||
if msg.Height == state.Height {
|
||||
newVote = votes.AddVote(msg)
|
||||
if newVote {
|
||||
switch msg.Type {
|
||||
case Prevote:
|
||||
prevotes = votes.Prevotes(msg.Round)
|
||||
if prevotes.WeakCertificate() and msg.Round > state.Round {
|
||||
event = EventNewRound { msg.Height, msg.Round }
|
||||
state, msg, timeout = Consensus(event, state)
|
||||
state = handleStateChange(state, msg, timeout)
|
||||
}
|
||||
|
||||
if blockID, ok = prevotes.TwoThirdsMajority(); ok and blockID != nil {
|
||||
if msg.Round == state.Round and hasBlock(blockID) {
|
||||
event = Majority23PrevotesBlock { msg.Height, msg.Round, blockID }
|
||||
state, msg, timeout = Consensus(event, state)
|
||||
state = handleStateChange(state, msg, timeout)
|
||||
}
|
||||
if proposal != nil and proposal.POLRound == msg.Round and hasBlock(blockID) {
|
||||
event = EventProposal {
|
||||
Height: state.Height
|
||||
Round: state.Round
|
||||
BlockID: blockID
|
||||
POLRound: proposal.POLRound
|
||||
Sender: message.Sender
|
||||
}
|
||||
state, msg, timeout = Consensus(event, state)
|
||||
state = handleStateChange(state, msg, timeout)
|
||||
}
|
||||
}
|
||||
|
||||
if prevotes.HasTwoThirdsAny() and msg.Round == state.Round {
|
||||
event = Majority23PrevotesAny { msg.Height, msg.Round, blockID }
|
||||
state, msg, timeout = Consensus(event, state)
|
||||
state = handleStateChange(state, msg, timeout)
|
||||
}
|
||||
|
||||
case Precommit:
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
case timeout := <- timeoutCh:
|
||||
|
||||
case block := <- blockCh:
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleStateChange(state, msg, timeout) State {
|
||||
if state.Step == Commit {
|
||||
state = ExecuteBlock(state.LockedValue)
|
||||
}
|
||||
if msg != nil {
|
||||
send msg
|
||||
}
|
||||
if timeout != nil {
|
||||
trigger timeout
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
### Implementation roadmap
|
||||
|
||||
* implement proposed implementation
|
||||
|
17
node/node.go
17
node/node.go
@@ -878,16 +878,20 @@ func createAndStartPrivValidatorSocketClient(
|
||||
listenAddr string,
|
||||
logger log.Logger,
|
||||
) (types.PrivValidator, error) {
|
||||
var pvsc types.PrivValidator
|
||||
var listener net.Listener
|
||||
|
||||
protocol, address := cmn.ProtocolAndAddress(listenAddr)
|
||||
ln, err := net.Listen(protocol, address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch protocol {
|
||||
case "unix":
|
||||
pvsc = privval.NewIPCVal(logger.With("module", "privval"), address)
|
||||
listener = privval.NewUnixListener(ln)
|
||||
case "tcp":
|
||||
// TODO: persist this key so external signer
|
||||
// can actually authenticate us
|
||||
pvsc = privval.NewTCPVal(logger.With("module", "privval"), listenAddr, ed25519.GenPrivKey())
|
||||
listener = privval.NewTCPListener(ln, ed25519.GenPrivKey())
|
||||
default:
|
||||
return nil, fmt.Errorf(
|
||||
"Wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
|
||||
@@ -895,10 +899,9 @@ func createAndStartPrivValidatorSocketClient(
|
||||
)
|
||||
}
|
||||
|
||||
if pvsc, ok := pvsc.(cmn.Service); ok {
|
||||
if err := pvsc.Start(); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to start")
|
||||
}
|
||||
pvsc := privval.NewSocketVal(logger.With("module", "privval"), listener)
|
||||
if err := pvsc.Start(); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to start")
|
||||
}
|
||||
|
||||
return pvsc, nil
|
||||
|
@@ -122,25 +122,25 @@ func TestNodeSetPrivValTCP(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
|
||||
config.BaseConfig.PrivValidatorListenAddr = addr
|
||||
|
||||
rs := privval.NewRemoteSigner(
|
||||
dialer := privval.DialTCPFn(addr, 100*time.Millisecond, ed25519.GenPrivKey())
|
||||
pvsc := privval.NewRemoteSigner(
|
||||
log.TestingLogger(),
|
||||
config.ChainID(),
|
||||
addr,
|
||||
types.NewMockPV(),
|
||||
ed25519.GenPrivKey(),
|
||||
dialer,
|
||||
)
|
||||
privval.RemoteSignerConnDeadline(5 * time.Millisecond)(rs)
|
||||
|
||||
go func() {
|
||||
err := rs.Start()
|
||||
err := pvsc.Start()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
defer rs.Stop()
|
||||
defer pvsc.Stop()
|
||||
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
assert.IsType(t, &privval.TCPVal{}, n.PrivValidator())
|
||||
assert.IsType(t, &privval.SocketVal{}, n.PrivValidator())
|
||||
}
|
||||
|
||||
// address without a protocol must result in error
|
||||
@@ -161,25 +161,25 @@ func TestNodeSetPrivValIPC(t *testing.T) {
|
||||
config := cfg.ResetTestRoot("node_priv_val_tcp_test")
|
||||
config.BaseConfig.PrivValidatorListenAddr = "unix://" + tmpfile
|
||||
|
||||
rs := privval.NewIPCRemoteSigner(
|
||||
dialer := privval.DialUnixFn(tmpfile)
|
||||
pvsc := privval.NewRemoteSigner(
|
||||
log.TestingLogger(),
|
||||
config.ChainID(),
|
||||
tmpfile,
|
||||
types.NewMockPV(),
|
||||
dialer,
|
||||
)
|
||||
privval.IPCRemoteSignerConnDeadline(3 * time.Second)(rs)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
assert.IsType(t, &privval.IPCVal{}, n.PrivValidator())
|
||||
assert.IsType(t, &privval.SocketVal{}, n.PrivValidator())
|
||||
}()
|
||||
|
||||
err := rs.Start()
|
||||
err := pvsc.Start()
|
||||
require.NoError(t, err)
|
||||
defer rs.Stop()
|
||||
defer pvsc.Stop()
|
||||
|
||||
<-done
|
||||
}
|
||||
|
@@ -4,8 +4,8 @@ The p2p package provides an abstraction around peer-to-peer communication.
|
||||
|
||||
Docs:
|
||||
|
||||
- [Connection](https://github.com/tendermint/tendermint/blob/master/docs/spec/docs/spec/p2p/connection.md) for details on how connections and multiplexing work
|
||||
- [Peer](https://github.com/tendermint/tendermint/blob/master/docs/spec/docs/spec/p2p/peer.md) for details on peer ID, handshakes, and peer exchange
|
||||
- [Node](https://github.com/tendermint/tendermint/blob/master/docs/spec/docs/spec/p2p/node.md) for details about different types of nodes and how they should work
|
||||
- [Pex](https://github.com/tendermint/tendermint/blob/master/docs/spec/docs/spec/reactors/pex/pex.md) for details on peer discovery and exchange
|
||||
- [Config](https://github.com/tendermint/tendermint/blob/master/docs/spec/docs/spec/p2p/config.md) for details on some config option
|
||||
- [Connection](https://github.com/tendermint/tendermint/blob/master/docs/spec/p2p/connection.md) for details on how connections and multiplexing work
|
||||
- [Peer](https://github.com/tendermint/tendermint/blob/master/docs/spec/p2p/peer.md) for details on peer ID, handshakes, and peer exchange
|
||||
- [Node](https://github.com/tendermint/tendermint/blob/master/docs/spec/p2p/node.md) for details about different types of nodes and how they should work
|
||||
- [Pex](https://github.com/tendermint/tendermint/blob/master/docs/spec/reactors/pex/pex.md) for details on peer discovery and exchange
|
||||
- [Config](https://github.com/tendermint/tendermint/blob/master/docs/spec/p2p/config.md) for details on some config option
|
||||
|
@@ -8,6 +8,7 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/crypto/chacha20poly1305"
|
||||
@@ -27,20 +28,36 @@ const aeadSizeOverhead = 16 // overhead of poly 1305 authentication tag
|
||||
const aeadKeySize = chacha20poly1305.KeySize
|
||||
const aeadNonceSize = chacha20poly1305.NonceSize
|
||||
|
||||
// SecretConnection implements net.conn.
|
||||
// SecretConnection implements net.Conn.
|
||||
// It is an implementation of the STS protocol.
|
||||
// Note we do not (yet) assume that a remote peer's pubkey
|
||||
// is known ahead of time, and thus we are technically
|
||||
// still vulnerable to MITM. (TODO!)
|
||||
// See docs/sts-final.pdf for more info
|
||||
// See https://github.com/tendermint/tendermint/blob/0.1/docs/sts-final.pdf for
|
||||
// details on the protocol.
|
||||
//
|
||||
// Consumers of the SecretConnection are responsible for authenticating
|
||||
// the remote peer's pubkey against known information, like a nodeID.
|
||||
// Otherwise they are vulnerable to MITM.
|
||||
// (TODO(ismail): see also https://github.com/tendermint/tendermint/issues/3010)
|
||||
type SecretConnection struct {
|
||||
conn io.ReadWriteCloser
|
||||
recvBuffer []byte
|
||||
recvNonce *[aeadNonceSize]byte
|
||||
sendNonce *[aeadNonceSize]byte
|
||||
|
||||
// immutable
|
||||
recvSecret *[aeadKeySize]byte
|
||||
sendSecret *[aeadKeySize]byte
|
||||
remPubKey crypto.PubKey
|
||||
conn io.ReadWriteCloser
|
||||
|
||||
// net.Conn must be thread safe:
|
||||
// https://golang.org/pkg/net/#Conn.
|
||||
// Since we have internal mutable state,
|
||||
// we need mtxs. But recv and send states
|
||||
// are independent, so we can use two mtxs.
|
||||
// All .Read are covered by recvMtx,
|
||||
// all .Write are covered by sendMtx.
|
||||
recvMtx sync.Mutex
|
||||
recvBuffer []byte
|
||||
recvNonce *[aeadNonceSize]byte
|
||||
|
||||
sendMtx sync.Mutex
|
||||
sendNonce *[aeadNonceSize]byte
|
||||
}
|
||||
|
||||
// MakeSecretConnection performs handshake and returns a new authenticated
|
||||
@@ -109,9 +126,12 @@ func (sc *SecretConnection) RemotePubKey() crypto.PubKey {
|
||||
return sc.remPubKey
|
||||
}
|
||||
|
||||
// Writes encrypted frames of `sealedFrameSize`
|
||||
// CONTRACT: data smaller than dataMaxSize is read atomically.
|
||||
// Writes encrypted frames of `totalFrameSize + aeadSizeOverhead`.
|
||||
// CONTRACT: data smaller than dataMaxSize is written atomically.
|
||||
func (sc *SecretConnection) Write(data []byte) (n int, err error) {
|
||||
sc.sendMtx.Lock()
|
||||
defer sc.sendMtx.Unlock()
|
||||
|
||||
for 0 < len(data) {
|
||||
var frame = make([]byte, totalFrameSize)
|
||||
var chunk []byte
|
||||
@@ -130,6 +150,7 @@ func (sc *SecretConnection) Write(data []byte) (n int, err error) {
|
||||
if err != nil {
|
||||
return n, errors.New("Invalid SecretConnection Key")
|
||||
}
|
||||
|
||||
// encrypt the frame
|
||||
var sealedFrame = make([]byte, aeadSizeOverhead+totalFrameSize)
|
||||
aead.Seal(sealedFrame[:0], sc.sendNonce[:], frame, nil)
|
||||
@@ -147,23 +168,30 @@ func (sc *SecretConnection) Write(data []byte) (n int, err error) {
|
||||
|
||||
// CONTRACT: data smaller than dataMaxSize is read atomically.
|
||||
func (sc *SecretConnection) Read(data []byte) (n int, err error) {
|
||||
sc.recvMtx.Lock()
|
||||
defer sc.recvMtx.Unlock()
|
||||
|
||||
// read off and update the recvBuffer, if non-empty
|
||||
if 0 < len(sc.recvBuffer) {
|
||||
n = copy(data, sc.recvBuffer)
|
||||
sc.recvBuffer = sc.recvBuffer[n:]
|
||||
return
|
||||
}
|
||||
|
||||
// read off the conn
|
||||
sealedFrame := make([]byte, totalFrameSize+aeadSizeOverhead)
|
||||
_, err = io.ReadFull(sc.conn, sealedFrame)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
aead, err := chacha20poly1305.New(sc.recvSecret[:])
|
||||
if err != nil {
|
||||
return n, errors.New("Invalid SecretConnection Key")
|
||||
}
|
||||
sealedFrame := make([]byte, totalFrameSize+aeadSizeOverhead)
|
||||
_, err = io.ReadFull(sc.conn, sealedFrame)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// decrypt the frame
|
||||
// decrypt the frame.
|
||||
// reads and updates the sc.recvNonce
|
||||
var frame = make([]byte, totalFrameSize)
|
||||
_, err = aead.Open(frame[:0], sc.recvNonce[:], sealedFrame, nil)
|
||||
if err != nil {
|
||||
@@ -172,12 +200,13 @@ func (sc *SecretConnection) Read(data []byte) (n int, err error) {
|
||||
incrNonce(sc.recvNonce)
|
||||
// end decryption
|
||||
|
||||
// copy checkLength worth into data,
|
||||
// set recvBuffer to the rest.
|
||||
var chunkLength = binary.LittleEndian.Uint32(frame) // read the first four bytes
|
||||
if chunkLength > dataMaxSize {
|
||||
return 0, errors.New("chunkLength is greater than dataMaxSize")
|
||||
}
|
||||
var chunk = frame[dataLenSize : dataLenSize+chunkLength]
|
||||
|
||||
n = copy(data, chunk)
|
||||
sc.recvBuffer = chunk[n:]
|
||||
return
|
||||
|
@@ -7,10 +7,12 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
@@ -98,6 +100,69 @@ func TestSecretConnectionHandshake(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentWrite(t *testing.T) {
|
||||
fooSecConn, barSecConn := makeSecretConnPair(t)
|
||||
fooWriteText := cmn.RandStr(dataMaxSize)
|
||||
|
||||
// write from two routines.
|
||||
// should be safe from race according to net.Conn:
|
||||
// https://golang.org/pkg/net/#Conn
|
||||
n := 100
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(3)
|
||||
go writeLots(t, wg, fooSecConn, fooWriteText, n)
|
||||
go writeLots(t, wg, fooSecConn, fooWriteText, n)
|
||||
|
||||
// Consume reads from bar's reader
|
||||
readLots(t, wg, barSecConn, n*2)
|
||||
wg.Wait()
|
||||
|
||||
if err := fooSecConn.Close(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentRead(t *testing.T) {
|
||||
fooSecConn, barSecConn := makeSecretConnPair(t)
|
||||
fooWriteText := cmn.RandStr(dataMaxSize)
|
||||
n := 100
|
||||
|
||||
// read from two routines.
|
||||
// should be safe from race according to net.Conn:
|
||||
// https://golang.org/pkg/net/#Conn
|
||||
wg := new(sync.WaitGroup)
|
||||
wg.Add(3)
|
||||
go readLots(t, wg, fooSecConn, n/2)
|
||||
go readLots(t, wg, fooSecConn, n/2)
|
||||
|
||||
// write to bar
|
||||
writeLots(t, wg, barSecConn, fooWriteText, n)
|
||||
wg.Wait()
|
||||
|
||||
if err := fooSecConn.Close(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
func writeLots(t *testing.T, wg *sync.WaitGroup, conn net.Conn, txt string, n int) {
|
||||
defer wg.Done()
|
||||
for i := 0; i < n; i++ {
|
||||
_, err := conn.Write([]byte(txt))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to write to fooSecConn: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func readLots(t *testing.T, wg *sync.WaitGroup, conn net.Conn, n int) {
|
||||
readBuffer := make([]byte, dataMaxSize)
|
||||
for i := 0; i < n; i++ {
|
||||
_, err := conn.Read(readBuffer)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
func TestSecretConnectionReadWrite(t *testing.T) {
|
||||
fooConn, barConn := makeKVStoreConnPair()
|
||||
fooWrites, barWrites := []string{}, []string{}
|
||||
|
263
privval/client.go
Normal file
263
privval/client.go
Normal file
@@ -0,0 +1,263 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultConnHeartBeatSeconds = 2
|
||||
defaultDialRetries = 10
|
||||
)
|
||||
|
||||
// Socket errors.
|
||||
var (
|
||||
ErrUnexpectedResponse = errors.New("received unexpected response")
|
||||
)
|
||||
|
||||
var (
|
||||
connHeartbeat = time.Second * defaultConnHeartBeatSeconds
|
||||
)
|
||||
|
||||
// SocketValOption sets an optional parameter on the SocketVal.
|
||||
type SocketValOption func(*SocketVal)
|
||||
|
||||
// SocketValHeartbeat sets the period on which to check the liveness of the
|
||||
// connected Signer connections.
|
||||
func SocketValHeartbeat(period time.Duration) SocketValOption {
|
||||
return func(sc *SocketVal) { sc.connHeartbeat = period }
|
||||
}
|
||||
|
||||
// SocketVal implements PrivValidator.
|
||||
// It listens for an external process to dial in and uses
|
||||
// the socket to request signatures.
|
||||
type SocketVal struct {
|
||||
cmn.BaseService
|
||||
|
||||
listener net.Listener
|
||||
|
||||
// ping
|
||||
cancelPing chan struct{}
|
||||
pingTicker *time.Ticker
|
||||
connHeartbeat time.Duration
|
||||
|
||||
// signer is mutable since it can be
|
||||
// reset if the connection fails.
|
||||
// failures are detected by a background
|
||||
// ping routine.
|
||||
// Methods on the underlying net.Conn itself
|
||||
// are already gorountine safe.
|
||||
mtx sync.RWMutex
|
||||
signer *RemoteSignerClient
|
||||
}
|
||||
|
||||
// Check that SocketVal implements PrivValidator.
|
||||
var _ types.PrivValidator = (*SocketVal)(nil)
|
||||
|
||||
// NewSocketVal returns an instance of SocketVal.
|
||||
func NewSocketVal(
|
||||
logger log.Logger,
|
||||
listener net.Listener,
|
||||
) *SocketVal {
|
||||
sc := &SocketVal{
|
||||
listener: listener,
|
||||
connHeartbeat: connHeartbeat,
|
||||
}
|
||||
|
||||
sc.BaseService = *cmn.NewBaseService(logger, "SocketVal", sc)
|
||||
|
||||
return sc
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// Implement PrivValidator
|
||||
|
||||
// GetPubKey implements PrivValidator.
|
||||
func (sc *SocketVal) GetPubKey() crypto.PubKey {
|
||||
sc.mtx.RLock()
|
||||
defer sc.mtx.RUnlock()
|
||||
return sc.signer.GetPubKey()
|
||||
}
|
||||
|
||||
// SignVote implements PrivValidator.
|
||||
func (sc *SocketVal) SignVote(chainID string, vote *types.Vote) error {
|
||||
sc.mtx.RLock()
|
||||
defer sc.mtx.RUnlock()
|
||||
return sc.signer.SignVote(chainID, vote)
|
||||
}
|
||||
|
||||
// SignProposal implements PrivValidator.
|
||||
func (sc *SocketVal) SignProposal(chainID string, proposal *types.Proposal) error {
|
||||
sc.mtx.RLock()
|
||||
defer sc.mtx.RUnlock()
|
||||
return sc.signer.SignProposal(chainID, proposal)
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// More thread safe methods proxied to the signer
|
||||
|
||||
// Ping is used to check connection health.
|
||||
func (sc *SocketVal) Ping() error {
|
||||
sc.mtx.RLock()
|
||||
defer sc.mtx.RUnlock()
|
||||
return sc.signer.Ping()
|
||||
}
|
||||
|
||||
// Close closes the underlying net.Conn.
|
||||
func (sc *SocketVal) Close() {
|
||||
sc.mtx.RLock()
|
||||
defer sc.mtx.RUnlock()
|
||||
if sc.signer != nil {
|
||||
if err := sc.signer.Close(); err != nil {
|
||||
sc.Logger.Error("OnStop", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if sc.listener != nil {
|
||||
if err := sc.listener.Close(); err != nil {
|
||||
sc.Logger.Error("OnStop", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// Service start and stop
|
||||
|
||||
// OnStart implements cmn.Service.
|
||||
func (sc *SocketVal) OnStart() error {
|
||||
if closed, err := sc.reset(); err != nil {
|
||||
sc.Logger.Error("OnStart", "err", err)
|
||||
return err
|
||||
} else if closed {
|
||||
return fmt.Errorf("listener is closed")
|
||||
}
|
||||
|
||||
// Start a routine to keep the connection alive
|
||||
sc.cancelPing = make(chan struct{}, 1)
|
||||
sc.pingTicker = time.NewTicker(sc.connHeartbeat)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-sc.pingTicker.C:
|
||||
err := sc.Ping()
|
||||
if err != nil {
|
||||
sc.Logger.Error("Ping", "err", err)
|
||||
if err == ErrUnexpectedResponse {
|
||||
return
|
||||
}
|
||||
|
||||
closed, err := sc.reset()
|
||||
if err != nil {
|
||||
sc.Logger.Error("Reconnecting to remote signer failed", "err", err)
|
||||
continue
|
||||
}
|
||||
if closed {
|
||||
sc.Logger.Info("listener is closing")
|
||||
return
|
||||
}
|
||||
|
||||
sc.Logger.Info("Re-created connection to remote signer", "impl", sc)
|
||||
}
|
||||
case <-sc.cancelPing:
|
||||
sc.pingTicker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements cmn.Service.
|
||||
func (sc *SocketVal) OnStop() {
|
||||
if sc.cancelPing != nil {
|
||||
close(sc.cancelPing)
|
||||
}
|
||||
sc.Close()
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// Connection and signer management
|
||||
|
||||
// waits to accept and sets a new connection.
|
||||
// connection is closed in OnStop.
|
||||
// returns true if the listener is closed
|
||||
// (ie. it returns a nil conn).
|
||||
func (sc *SocketVal) reset() (bool, error) {
|
||||
sc.mtx.Lock()
|
||||
defer sc.mtx.Unlock()
|
||||
|
||||
// first check if the conn already exists and close it.
|
||||
if sc.signer != nil {
|
||||
if err := sc.signer.Close(); err != nil {
|
||||
sc.Logger.Error("error closing connection", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// wait for a new conn
|
||||
conn, err := sc.waitConnection()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// listener is closed
|
||||
if conn == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
sc.signer, err = NewRemoteSignerClient(conn)
|
||||
if err != nil {
|
||||
// failed to fetch the pubkey. close out the connection.
|
||||
if err := conn.Close(); err != nil {
|
||||
sc.Logger.Error("error closing connection", "err", err)
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (sc *SocketVal) acceptConnection() (net.Conn, error) {
|
||||
conn, err := sc.listener.Accept()
|
||||
if err != nil {
|
||||
if !sc.IsRunning() {
|
||||
return nil, nil // Ignore error from listener closing.
|
||||
}
|
||||
return nil, err
|
||||
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// waitConnection uses the configured wait timeout to error if no external
|
||||
// process connects in the time period.
|
||||
func (sc *SocketVal) waitConnection() (net.Conn, error) {
|
||||
var (
|
||||
connc = make(chan net.Conn, 1)
|
||||
errc = make(chan error, 1)
|
||||
)
|
||||
|
||||
go func(connc chan<- net.Conn, errc chan<- error) {
|
||||
conn, err := sc.acceptConnection()
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
|
||||
connc <- conn
|
||||
}(connc, errc)
|
||||
|
||||
select {
|
||||
case conn := <-connc:
|
||||
return conn, nil
|
||||
case err := <-errc:
|
||||
return nil, err
|
||||
}
|
||||
}
|
@@ -17,6 +17,16 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var (
|
||||
testAcceptDeadline = defaultAcceptDeadlineSeconds * time.Second
|
||||
|
||||
testConnDeadline = 100 * time.Millisecond
|
||||
testConnDeadline2o3 = 66 * time.Millisecond // 2/3 of the other one
|
||||
|
||||
testHeartbeatTimeout = 10 * time.Millisecond
|
||||
testHeartbeatTimeout3o2 = 6 * time.Millisecond // 3/2 of the other one
|
||||
)
|
||||
|
||||
func TestSocketPVAddress(t *testing.T) {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
@@ -39,8 +49,7 @@ func TestSocketPVPubKey(t *testing.T) {
|
||||
defer sc.Stop()
|
||||
defer rs.Stop()
|
||||
|
||||
clientKey, err := sc.getPubKey()
|
||||
require.NoError(t, err)
|
||||
clientKey := sc.GetPubKey()
|
||||
|
||||
privvalPubKey := rs.privVal.GetPubKey()
|
||||
|
||||
@@ -95,14 +104,14 @@ func TestSocketPVVoteResetDeadline(t *testing.T) {
|
||||
defer sc.Stop()
|
||||
defer rs.Stop()
|
||||
|
||||
time.Sleep(3 * time.Millisecond)
|
||||
time.Sleep(testConnDeadline2o3)
|
||||
|
||||
require.NoError(t, rs.privVal.SignVote(chainID, want))
|
||||
require.NoError(t, sc.SignVote(chainID, have))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
|
||||
// This would exceed the deadline if it was not extended by the previous message
|
||||
time.Sleep(3 * time.Millisecond)
|
||||
time.Sleep(testConnDeadline2o3)
|
||||
|
||||
require.NoError(t, rs.privVal.SignVote(chainID, want))
|
||||
require.NoError(t, sc.SignVote(chainID, have))
|
||||
@@ -122,7 +131,7 @@ func TestSocketPVVoteKeepalive(t *testing.T) {
|
||||
defer sc.Stop()
|
||||
defer rs.Stop()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
time.Sleep(testConnDeadline * 2)
|
||||
|
||||
require.NoError(t, rs.privVal.SignVote(chainID, want))
|
||||
require.NoError(t, sc.SignVote(chainID, have))
|
||||
@@ -131,18 +140,13 @@ func TestSocketPVVoteKeepalive(t *testing.T) {
|
||||
|
||||
func TestSocketPVDeadline(t *testing.T) {
|
||||
var (
|
||||
addr = testFreeAddr(t)
|
||||
listenc = make(chan struct{})
|
||||
sc = NewTCPVal(
|
||||
log.TestingLogger(),
|
||||
addr,
|
||||
ed25519.GenPrivKey(),
|
||||
)
|
||||
addr = testFreeAddr(t)
|
||||
listenc = make(chan struct{})
|
||||
thisConnTimeout = 100 * time.Millisecond
|
||||
sc = newSocketVal(log.TestingLogger(), addr, thisConnTimeout)
|
||||
)
|
||||
|
||||
TCPValConnTimeout(100 * time.Millisecond)(sc)
|
||||
|
||||
go func(sc *TCPVal) {
|
||||
go func(sc *SocketVal) {
|
||||
defer close(listenc)
|
||||
|
||||
assert.Equal(t, sc.Start().(cmn.Error).Data(), ErrConnTimeout)
|
||||
@@ -199,9 +203,8 @@ func TestRemoteSignerRetry(t *testing.T) {
|
||||
rs := NewRemoteSigner(
|
||||
log.TestingLogger(),
|
||||
cmn.RandStr(12),
|
||||
ln.Addr().String(),
|
||||
types.NewMockPV(),
|
||||
ed25519.GenPrivKey(),
|
||||
DialTCPFn(ln.Addr().String(), testConnDeadline, ed25519.GenPrivKey()),
|
||||
)
|
||||
defer rs.Stop()
|
||||
|
||||
@@ -230,15 +233,8 @@ func TestRemoteSignVoteErrors(t *testing.T) {
|
||||
defer sc.Stop()
|
||||
defer rs.Stop()
|
||||
|
||||
err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote})
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := readMsg(sc.conn)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp := *res.(*SignedVoteResponse)
|
||||
require.NotNil(t, resp.Error)
|
||||
require.Equal(t, resp.Error.Description, types.ErroringMockPVErr.Error())
|
||||
err := sc.SignVote("", vote)
|
||||
require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
|
||||
|
||||
err = rs.privVal.SignVote(chainID, vote)
|
||||
require.Error(t, err)
|
||||
@@ -257,15 +253,8 @@ func TestRemoteSignProposalErrors(t *testing.T) {
|
||||
defer sc.Stop()
|
||||
defer rs.Stop()
|
||||
|
||||
err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal})
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := readMsg(sc.conn)
|
||||
require.NoError(t, err)
|
||||
|
||||
resp := *res.(*SignedProposalResponse)
|
||||
require.NotNil(t, resp.Error)
|
||||
require.Equal(t, resp.Error.Description, types.ErroringMockPVErr.Error())
|
||||
err := sc.SignProposal("", proposal)
|
||||
require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
|
||||
|
||||
err = rs.privVal.SignProposal(chainID, proposal)
|
||||
require.Error(t, err)
|
||||
@@ -285,15 +274,10 @@ func TestErrUnexpectedResponse(t *testing.T) {
|
||||
rs = NewRemoteSigner(
|
||||
logger,
|
||||
chainID,
|
||||
addr,
|
||||
types.NewMockPV(),
|
||||
ed25519.GenPrivKey(),
|
||||
)
|
||||
sc = NewTCPVal(
|
||||
logger,
|
||||
addr,
|
||||
ed25519.GenPrivKey(),
|
||||
DialTCPFn(addr, testConnDeadline, ed25519.GenPrivKey()),
|
||||
)
|
||||
sc = newSocketVal(logger, addr, testConnDeadline)
|
||||
)
|
||||
|
||||
testStartSocketPV(t, readyc, sc)
|
||||
@@ -331,11 +315,73 @@ func TestErrUnexpectedResponse(t *testing.T) {
|
||||
require.Equal(t, err, ErrUnexpectedResponse)
|
||||
}
|
||||
|
||||
func TestRetryTCPConnToRemoteSigner(t *testing.T) {
|
||||
var (
|
||||
addr = testFreeAddr(t)
|
||||
logger = log.TestingLogger()
|
||||
chainID = cmn.RandStr(12)
|
||||
readyc = make(chan struct{})
|
||||
|
||||
rs = NewRemoteSigner(
|
||||
logger,
|
||||
chainID,
|
||||
types.NewMockPV(),
|
||||
DialTCPFn(addr, testConnDeadline, ed25519.GenPrivKey()),
|
||||
)
|
||||
thisConnTimeout = testConnDeadline
|
||||
sc = newSocketVal(logger, addr, thisConnTimeout)
|
||||
)
|
||||
// Ping every:
|
||||
SocketValHeartbeat(testHeartbeatTimeout)(sc)
|
||||
|
||||
RemoteSignerConnDeadline(testConnDeadline)(rs)
|
||||
RemoteSignerConnRetries(10)(rs)
|
||||
|
||||
testStartSocketPV(t, readyc, sc)
|
||||
defer sc.Stop()
|
||||
require.NoError(t, rs.Start())
|
||||
assert.True(t, rs.IsRunning())
|
||||
|
||||
<-readyc
|
||||
time.Sleep(testHeartbeatTimeout * 2)
|
||||
|
||||
rs.Stop()
|
||||
rs2 := NewRemoteSigner(
|
||||
logger,
|
||||
chainID,
|
||||
types.NewMockPV(),
|
||||
DialTCPFn(addr, testConnDeadline, ed25519.GenPrivKey()),
|
||||
)
|
||||
// let some pings pass
|
||||
time.Sleep(testHeartbeatTimeout3o2)
|
||||
require.NoError(t, rs2.Start())
|
||||
assert.True(t, rs2.IsRunning())
|
||||
defer rs2.Stop()
|
||||
|
||||
// give the client some time to re-establish the conn to the remote signer
|
||||
// should see sth like this in the logs:
|
||||
//
|
||||
// E[10016-01-10|17:12:46.128] Ping err="remote signer timed out"
|
||||
// I[10016-01-10|17:16:42.447] Re-created connection to remote signer impl=SocketVal
|
||||
time.Sleep(testConnDeadline * 2)
|
||||
}
|
||||
|
||||
func newSocketVal(logger log.Logger, addr string, connDeadline time.Duration) *SocketVal {
|
||||
ln, err := net.Listen(cmn.ProtocolAndAddress(addr))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
tcpLn := NewTCPListener(ln, ed25519.GenPrivKey())
|
||||
TCPListenerAcceptDeadline(testAcceptDeadline)(tcpLn)
|
||||
TCPListenerConnDeadline(testConnDeadline)(tcpLn)
|
||||
return NewSocketVal(logger, tcpLn)
|
||||
}
|
||||
|
||||
func testSetupSocketPair(
|
||||
t *testing.T,
|
||||
chainID string,
|
||||
privValidator types.PrivValidator,
|
||||
) (*TCPVal, *RemoteSigner) {
|
||||
) (*SocketVal, *RemoteSigner) {
|
||||
var (
|
||||
addr = testFreeAddr(t)
|
||||
logger = log.TestingLogger()
|
||||
@@ -344,20 +390,16 @@ func testSetupSocketPair(
|
||||
rs = NewRemoteSigner(
|
||||
logger,
|
||||
chainID,
|
||||
addr,
|
||||
privVal,
|
||||
ed25519.GenPrivKey(),
|
||||
)
|
||||
sc = NewTCPVal(
|
||||
logger,
|
||||
addr,
|
||||
ed25519.GenPrivKey(),
|
||||
DialTCPFn(addr, testConnDeadline, ed25519.GenPrivKey()),
|
||||
)
|
||||
|
||||
thisConnTimeout = testConnDeadline
|
||||
sc = newSocketVal(logger, addr, thisConnTimeout)
|
||||
)
|
||||
|
||||
TCPValConnTimeout(5 * time.Millisecond)(sc)
|
||||
TCPValHeartbeat(2 * time.Millisecond)(sc)
|
||||
RemoteSignerConnDeadline(5 * time.Millisecond)(rs)
|
||||
SocketValHeartbeat(testHeartbeatTimeout)(sc)
|
||||
RemoteSignerConnDeadline(testConnDeadline)(rs)
|
||||
RemoteSignerConnRetries(1e6)(rs)
|
||||
|
||||
testStartSocketPV(t, readyc, sc)
|
||||
@@ -378,8 +420,8 @@ func testReadWriteResponse(t *testing.T, resp RemoteSignerMsg, rsConn net.Conn)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func testStartSocketPV(t *testing.T, readyc chan struct{}, sc *TCPVal) {
|
||||
go func(sc *TCPVal) {
|
||||
func testStartSocketPV(t *testing.T, readyc chan struct{}, sc *SocketVal) {
|
||||
go func(sc *SocketVal) {
|
||||
require.NoError(t, sc.Start())
|
||||
assert.True(t, sc.IsRunning())
|
||||
|
21
privval/doc.go
Normal file
21
privval/doc.go
Normal file
@@ -0,0 +1,21 @@
|
||||
/*
|
||||
|
||||
Package privval provides different implementations of the types.PrivValidator.
|
||||
|
||||
FilePV
|
||||
|
||||
FilePV is the simplest implementation and developer default. It uses one file for the private key and another to store state.
|
||||
|
||||
SocketVal
|
||||
|
||||
SocketVal establishes a connection to an external process, like a Key Management Server (KMS), using a socket.
|
||||
SocketVal listens for the external KMS process to dial in.
|
||||
SocketVal takes a listener, which determines the type of connection
|
||||
(ie. encrypted over tcp, or unencrypted over unix).
|
||||
|
||||
RemoteSigner
|
||||
|
||||
RemoteSigner is a simple wrapper around a net.Conn. It's used by both IPCVal and TCPVal.
|
||||
|
||||
*/
|
||||
package privval
|
@@ -22,6 +22,7 @@ const (
|
||||
stepPrecommit int8 = 3
|
||||
)
|
||||
|
||||
// A vote is either stepPrevote or stepPrecommit.
|
||||
func voteToStep(vote *types.Vote) int8 {
|
||||
switch vote.Type {
|
||||
case types.PrevoteType:
|
||||
@@ -29,7 +30,7 @@ func voteToStep(vote *types.Vote) int8 {
|
||||
case types.PrecommitType:
|
||||
return stepPrecommit
|
||||
default:
|
||||
cmn.PanicSanity("Unknown vote type")
|
||||
panic("Unknown vote type")
|
||||
return 0
|
||||
}
|
||||
}
|
123
privval/ipc.go
123
privval/ipc.go
@@ -1,123 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// IPCValOption sets an optional parameter on the SocketPV.
|
||||
type IPCValOption func(*IPCVal)
|
||||
|
||||
// IPCValConnTimeout sets the read and write timeout for connections
|
||||
// from external signing processes.
|
||||
func IPCValConnTimeout(timeout time.Duration) IPCValOption {
|
||||
return func(sc *IPCVal) { sc.connTimeout = timeout }
|
||||
}
|
||||
|
||||
// IPCValHeartbeat sets the period on which to check the liveness of the
|
||||
// connected Signer connections.
|
||||
func IPCValHeartbeat(period time.Duration) IPCValOption {
|
||||
return func(sc *IPCVal) { sc.connHeartbeat = period }
|
||||
}
|
||||
|
||||
// IPCVal implements PrivValidator, it uses a unix socket to request signatures
|
||||
// from an external process.
|
||||
type IPCVal struct {
|
||||
cmn.BaseService
|
||||
*RemoteSignerClient
|
||||
|
||||
addr string
|
||||
|
||||
connTimeout time.Duration
|
||||
connHeartbeat time.Duration
|
||||
|
||||
conn net.Conn
|
||||
cancelPing chan struct{}
|
||||
pingTicker *time.Ticker
|
||||
}
|
||||
|
||||
// Check that IPCVal implements PrivValidator.
|
||||
var _ types.PrivValidator = (*IPCVal)(nil)
|
||||
|
||||
// NewIPCVal returns an instance of IPCVal.
|
||||
func NewIPCVal(
|
||||
logger log.Logger,
|
||||
socketAddr string,
|
||||
) *IPCVal {
|
||||
sc := &IPCVal{
|
||||
addr: socketAddr,
|
||||
connTimeout: connTimeout,
|
||||
connHeartbeat: connHeartbeat,
|
||||
}
|
||||
|
||||
sc.BaseService = *cmn.NewBaseService(logger, "IPCVal", sc)
|
||||
|
||||
return sc
|
||||
}
|
||||
|
||||
// OnStart implements cmn.Service.
|
||||
func (sc *IPCVal) OnStart() error {
|
||||
err := sc.connect()
|
||||
if err != nil {
|
||||
sc.Logger.Error("OnStart", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
sc.RemoteSignerClient, err = NewRemoteSignerClient(sc.conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start a routine to keep the connection alive
|
||||
sc.cancelPing = make(chan struct{}, 1)
|
||||
sc.pingTicker = time.NewTicker(sc.connHeartbeat)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-sc.pingTicker.C:
|
||||
err := sc.Ping()
|
||||
if err != nil {
|
||||
sc.Logger.Error("Ping", "err", err)
|
||||
}
|
||||
case <-sc.cancelPing:
|
||||
sc.pingTicker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements cmn.Service.
|
||||
func (sc *IPCVal) OnStop() {
|
||||
if sc.cancelPing != nil {
|
||||
close(sc.cancelPing)
|
||||
}
|
||||
|
||||
if sc.conn != nil {
|
||||
if err := sc.conn.Close(); err != nil {
|
||||
sc.Logger.Error("OnStop", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sc *IPCVal) connect() error {
|
||||
la, err := net.ResolveUnixAddr("unix", sc.addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn, err := net.DialUnix("unix", nil, la)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sc.conn = newTimeoutConn(conn, sc.connTimeout)
|
||||
|
||||
return nil
|
||||
}
|
@@ -1,132 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// IPCRemoteSignerOption sets an optional parameter on the IPCRemoteSigner.
|
||||
type IPCRemoteSignerOption func(*IPCRemoteSigner)
|
||||
|
||||
// IPCRemoteSignerConnDeadline sets the read and write deadline for connections
|
||||
// from external signing processes.
|
||||
func IPCRemoteSignerConnDeadline(deadline time.Duration) IPCRemoteSignerOption {
|
||||
return func(ss *IPCRemoteSigner) { ss.connDeadline = deadline }
|
||||
}
|
||||
|
||||
// IPCRemoteSignerConnRetries sets the amount of attempted retries to connect.
|
||||
func IPCRemoteSignerConnRetries(retries int) IPCRemoteSignerOption {
|
||||
return func(ss *IPCRemoteSigner) { ss.connRetries = retries }
|
||||
}
|
||||
|
||||
// IPCRemoteSigner is a RPC implementation of PrivValidator that listens on a unix socket.
|
||||
type IPCRemoteSigner struct {
|
||||
cmn.BaseService
|
||||
|
||||
addr string
|
||||
chainID string
|
||||
connDeadline time.Duration
|
||||
connRetries int
|
||||
privVal types.PrivValidator
|
||||
|
||||
listener *net.UnixListener
|
||||
}
|
||||
|
||||
// NewIPCRemoteSigner returns an instance of IPCRemoteSigner.
|
||||
func NewIPCRemoteSigner(
|
||||
logger log.Logger,
|
||||
chainID, socketAddr string,
|
||||
privVal types.PrivValidator,
|
||||
) *IPCRemoteSigner {
|
||||
rs := &IPCRemoteSigner{
|
||||
addr: socketAddr,
|
||||
chainID: chainID,
|
||||
connDeadline: time.Second * defaultConnDeadlineSeconds,
|
||||
connRetries: defaultDialRetries,
|
||||
privVal: privVal,
|
||||
}
|
||||
|
||||
rs.BaseService = *cmn.NewBaseService(logger, "IPCRemoteSigner", rs)
|
||||
|
||||
return rs
|
||||
}
|
||||
|
||||
// OnStart implements cmn.Service.
|
||||
func (rs *IPCRemoteSigner) OnStart() error {
|
||||
err := rs.listen()
|
||||
if err != nil {
|
||||
err = cmn.ErrorWrap(err, "listen")
|
||||
rs.Logger.Error("OnStart", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
conn, err := rs.listener.AcceptUnix()
|
||||
if err != nil {
|
||||
rs.Logger.Error("AcceptUnix", "err", err)
|
||||
return
|
||||
}
|
||||
go rs.handleConnection(conn)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements cmn.Service.
|
||||
func (rs *IPCRemoteSigner) OnStop() {
|
||||
if rs.listener != nil {
|
||||
if err := rs.listener.Close(); err != nil {
|
||||
rs.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed"))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rs *IPCRemoteSigner) listen() error {
|
||||
la, err := net.ResolveUnixAddr("unix", rs.addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rs.listener, err = net.ListenUnix("unix", la)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (rs *IPCRemoteSigner) handleConnection(conn net.Conn) {
|
||||
for {
|
||||
if !rs.IsRunning() {
|
||||
return // Ignore error from listener closing.
|
||||
}
|
||||
|
||||
// Reset the connection deadline
|
||||
conn.SetDeadline(time.Now().Add(rs.connDeadline))
|
||||
|
||||
req, err := readMsg(conn)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
rs.Logger.Error("handleConnection", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
res, err := handleRequest(req, rs.chainID, rs.privVal)
|
||||
|
||||
if err != nil {
|
||||
// only log the error; we'll reply with an error in res
|
||||
rs.Logger.Error("handleConnection", "err", err)
|
||||
}
|
||||
|
||||
err = writeMsg(conn, res)
|
||||
if err != nil {
|
||||
rs.Logger.Error("handleConnection", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,147 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func TestIPCPVVote(t *testing.T) {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
sc, rs = testSetupIPCSocketPair(t, chainID, types.NewMockPV())
|
||||
|
||||
ts = time.Now()
|
||||
vType = types.PrecommitType
|
||||
want = &types.Vote{Timestamp: ts, Type: vType}
|
||||
have = &types.Vote{Timestamp: ts, Type: vType}
|
||||
)
|
||||
defer sc.Stop()
|
||||
defer rs.Stop()
|
||||
|
||||
require.NoError(t, rs.privVal.SignVote(chainID, want))
|
||||
require.NoError(t, sc.SignVote(chainID, have))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
|
||||
func TestIPCPVVoteResetDeadline(t *testing.T) {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
sc, rs = testSetupIPCSocketPair(t, chainID, types.NewMockPV())
|
||||
|
||||
ts = time.Now()
|
||||
vType = types.PrecommitType
|
||||
want = &types.Vote{Timestamp: ts, Type: vType}
|
||||
have = &types.Vote{Timestamp: ts, Type: vType}
|
||||
)
|
||||
defer sc.Stop()
|
||||
defer rs.Stop()
|
||||
|
||||
time.Sleep(3 * time.Millisecond)
|
||||
|
||||
require.NoError(t, rs.privVal.SignVote(chainID, want))
|
||||
require.NoError(t, sc.SignVote(chainID, have))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
|
||||
// This would exceed the deadline if it was not extended by the previous message
|
||||
time.Sleep(3 * time.Millisecond)
|
||||
|
||||
require.NoError(t, rs.privVal.SignVote(chainID, want))
|
||||
require.NoError(t, sc.SignVote(chainID, have))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
|
||||
func TestIPCPVVoteKeepalive(t *testing.T) {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
sc, rs = testSetupIPCSocketPair(t, chainID, types.NewMockPV())
|
||||
|
||||
ts = time.Now()
|
||||
vType = types.PrecommitType
|
||||
want = &types.Vote{Timestamp: ts, Type: vType}
|
||||
have = &types.Vote{Timestamp: ts, Type: vType}
|
||||
)
|
||||
defer sc.Stop()
|
||||
defer rs.Stop()
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
require.NoError(t, rs.privVal.SignVote(chainID, want))
|
||||
require.NoError(t, sc.SignVote(chainID, have))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
|
||||
func testSetupIPCSocketPair(
|
||||
t *testing.T,
|
||||
chainID string,
|
||||
privValidator types.PrivValidator,
|
||||
) (*IPCVal, *IPCRemoteSigner) {
|
||||
addr, err := testUnixAddr()
|
||||
require.NoError(t, err)
|
||||
|
||||
var (
|
||||
logger = log.TestingLogger()
|
||||
privVal = privValidator
|
||||
readyc = make(chan struct{})
|
||||
rs = NewIPCRemoteSigner(
|
||||
logger,
|
||||
chainID,
|
||||
addr,
|
||||
privVal,
|
||||
)
|
||||
sc = NewIPCVal(
|
||||
logger,
|
||||
addr,
|
||||
)
|
||||
)
|
||||
|
||||
IPCValConnTimeout(5 * time.Millisecond)(sc)
|
||||
IPCValHeartbeat(time.Millisecond)(sc)
|
||||
|
||||
IPCRemoteSignerConnDeadline(time.Millisecond * 5)(rs)
|
||||
|
||||
testStartIPCRemoteSigner(t, readyc, rs)
|
||||
|
||||
<-readyc
|
||||
|
||||
require.NoError(t, sc.Start())
|
||||
assert.True(t, sc.IsRunning())
|
||||
|
||||
return sc, rs
|
||||
}
|
||||
|
||||
func testStartIPCRemoteSigner(t *testing.T, readyc chan struct{}, rs *IPCRemoteSigner) {
|
||||
go func(rs *IPCRemoteSigner) {
|
||||
require.NoError(t, rs.Start())
|
||||
assert.True(t, rs.IsRunning())
|
||||
|
||||
readyc <- struct{}{}
|
||||
}(rs)
|
||||
}
|
||||
|
||||
func testUnixAddr() (string, error) {
|
||||
f, err := ioutil.TempFile("/tmp", "nettest")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
addr := f.Name()
|
||||
err = f.Close()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
err = os.Remove(addr)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return addr, nil
|
||||
}
|
@@ -4,7 +4,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
@@ -14,31 +13,41 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// RemoteSignerClient implements PrivValidator, it uses a socket to request signatures
|
||||
// Socket errors.
|
||||
var (
|
||||
ErrConnTimeout = errors.New("remote signer timed out")
|
||||
)
|
||||
|
||||
// RemoteSignerClient implements PrivValidator.
|
||||
// It uses a net.Conn to request signatures
|
||||
// from an external process.
|
||||
type RemoteSignerClient struct {
|
||||
conn net.Conn
|
||||
conn net.Conn
|
||||
|
||||
// memoized
|
||||
consensusPubKey crypto.PubKey
|
||||
mtx sync.Mutex
|
||||
}
|
||||
|
||||
// Check that RemoteSignerClient implements PrivValidator.
|
||||
var _ types.PrivValidator = (*RemoteSignerClient)(nil)
|
||||
|
||||
// NewRemoteSignerClient returns an instance of RemoteSignerClient.
|
||||
func NewRemoteSignerClient(
|
||||
conn net.Conn,
|
||||
) (*RemoteSignerClient, error) {
|
||||
sc := &RemoteSignerClient{
|
||||
conn: conn,
|
||||
}
|
||||
pubKey, err := sc.getPubKey()
|
||||
func NewRemoteSignerClient(conn net.Conn) (*RemoteSignerClient, error) {
|
||||
|
||||
// retrieve and memoize the consensus public key once.
|
||||
pubKey, err := getPubKey(conn)
|
||||
if err != nil {
|
||||
return nil, cmn.ErrorWrap(err, "error while retrieving public key for remote signer")
|
||||
}
|
||||
// retrieve and memoize the consensus public key once:
|
||||
sc.consensusPubKey = pubKey
|
||||
return sc, nil
|
||||
return &RemoteSignerClient{
|
||||
conn: conn,
|
||||
consensusPubKey: pubKey,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close calls Close on the underlying net.Conn.
|
||||
func (sc *RemoteSignerClient) Close() error {
|
||||
return sc.conn.Close()
|
||||
}
|
||||
|
||||
// GetPubKey implements PrivValidator.
|
||||
@@ -46,16 +55,14 @@ func (sc *RemoteSignerClient) GetPubKey() crypto.PubKey {
|
||||
return sc.consensusPubKey
|
||||
}
|
||||
|
||||
func (sc *RemoteSignerClient) getPubKey() (crypto.PubKey, error) {
|
||||
sc.mtx.Lock()
|
||||
defer sc.mtx.Unlock()
|
||||
|
||||
err := writeMsg(sc.conn, &PubKeyRequest{})
|
||||
// not thread-safe (only called on startup).
|
||||
func getPubKey(conn net.Conn) (crypto.PubKey, error) {
|
||||
err := writeMsg(conn, &PubKeyRequest{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := readMsg(sc.conn)
|
||||
res, err := readMsg(conn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -73,9 +80,6 @@ func (sc *RemoteSignerClient) getPubKey() (crypto.PubKey, error) {
|
||||
|
||||
// SignVote implements PrivValidator.
|
||||
func (sc *RemoteSignerClient) SignVote(chainID string, vote *types.Vote) error {
|
||||
sc.mtx.Lock()
|
||||
defer sc.mtx.Unlock()
|
||||
|
||||
err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote})
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -103,9 +107,6 @@ func (sc *RemoteSignerClient) SignProposal(
|
||||
chainID string,
|
||||
proposal *types.Proposal,
|
||||
) error {
|
||||
sc.mtx.Lock()
|
||||
defer sc.mtx.Unlock()
|
||||
|
||||
err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal})
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -129,9 +130,6 @@ func (sc *RemoteSignerClient) SignProposal(
|
||||
|
||||
// Ping is used to check connection health.
|
||||
func (sc *RemoteSignerClient) Ping() error {
|
||||
sc.mtx.Lock()
|
||||
defer sc.mtx.Unlock()
|
||||
|
||||
err := writeMsg(sc.conn, &PingRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
|
@@ -5,6 +5,7 @@ import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
@@ -12,6 +13,11 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// Socket errors.
|
||||
var (
|
||||
ErrDialRetryMax = errors.New("dialed maximum retries")
|
||||
)
|
||||
|
||||
// RemoteSignerOption sets an optional parameter on the RemoteSigner.
|
||||
type RemoteSignerOption func(*RemoteSigner)
|
||||
|
||||
@@ -26,38 +32,64 @@ func RemoteSignerConnRetries(retries int) RemoteSignerOption {
|
||||
return func(ss *RemoteSigner) { ss.connRetries = retries }
|
||||
}
|
||||
|
||||
// RemoteSigner implements PrivValidator by dialing to a socket.
|
||||
// RemoteSigner dials using its dialer and responds to any
|
||||
// signature requests using its privVal.
|
||||
type RemoteSigner struct {
|
||||
cmn.BaseService
|
||||
|
||||
addr string
|
||||
chainID string
|
||||
connDeadline time.Duration
|
||||
connRetries int
|
||||
privKey ed25519.PrivKeyEd25519
|
||||
privVal types.PrivValidator
|
||||
|
||||
conn net.Conn
|
||||
dialer Dialer
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
// NewRemoteSigner returns an instance of RemoteSigner.
|
||||
// Dialer dials a remote address and returns a net.Conn or an error.
|
||||
type Dialer func() (net.Conn, error)
|
||||
|
||||
// DialTCPFn dials the given tcp addr, using the given connTimeout and privKey for the
|
||||
// authenticated encryption handshake.
|
||||
func DialTCPFn(addr string, connTimeout time.Duration, privKey ed25519.PrivKeyEd25519) Dialer {
|
||||
return func() (net.Conn, error) {
|
||||
conn, err := cmn.Connect(addr)
|
||||
if err == nil {
|
||||
err = conn.SetDeadline(time.Now().Add(connTimeout))
|
||||
}
|
||||
if err == nil {
|
||||
conn, err = p2pconn.MakeSecretConnection(conn, privKey)
|
||||
}
|
||||
return conn, err
|
||||
}
|
||||
}
|
||||
|
||||
// DialUnixFn dials the given unix socket.
|
||||
func DialUnixFn(addr string) Dialer {
|
||||
return func() (net.Conn, error) {
|
||||
unixAddr := &net.UnixAddr{addr, "unix"}
|
||||
return net.DialUnix("unix", nil, unixAddr)
|
||||
}
|
||||
}
|
||||
|
||||
// NewRemoteSigner return a RemoteSigner that will dial using the given
|
||||
// dialer and respond to any signature requests over the connection
|
||||
// using the given privVal.
|
||||
func NewRemoteSigner(
|
||||
logger log.Logger,
|
||||
chainID, socketAddr string,
|
||||
chainID string,
|
||||
privVal types.PrivValidator,
|
||||
privKey ed25519.PrivKeyEd25519,
|
||||
dialer Dialer,
|
||||
) *RemoteSigner {
|
||||
rs := &RemoteSigner{
|
||||
addr: socketAddr,
|
||||
chainID: chainID,
|
||||
connDeadline: time.Second * defaultConnDeadlineSeconds,
|
||||
connRetries: defaultDialRetries,
|
||||
privKey: privKey,
|
||||
privVal: privVal,
|
||||
dialer: dialer,
|
||||
}
|
||||
|
||||
rs.BaseService = *cmn.NewBaseService(logger, "RemoteSigner", rs)
|
||||
|
||||
return rs
|
||||
}
|
||||
|
||||
@@ -68,6 +100,7 @@ func (rs *RemoteSigner) OnStart() error {
|
||||
rs.Logger.Error("OnStart", "err", err)
|
||||
return err
|
||||
}
|
||||
rs.conn = conn
|
||||
|
||||
go rs.handleConnection(conn)
|
||||
|
||||
@@ -91,36 +124,11 @@ func (rs *RemoteSigner) connect() (net.Conn, error) {
|
||||
if retries != rs.connRetries {
|
||||
time.Sleep(rs.connDeadline)
|
||||
}
|
||||
|
||||
conn, err := cmn.Connect(rs.addr)
|
||||
conn, err := rs.dialer()
|
||||
if err != nil {
|
||||
rs.Logger.Error(
|
||||
"connect",
|
||||
"addr", rs.addr,
|
||||
"err", err,
|
||||
)
|
||||
|
||||
rs.Logger.Error("dialing", "err", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err := conn.SetDeadline(time.Now().Add(connTimeout)); err != nil {
|
||||
rs.Logger.Error(
|
||||
"connect",
|
||||
"err", err,
|
||||
)
|
||||
continue
|
||||
}
|
||||
|
||||
conn, err = p2pconn.MakeSecretConnection(conn, rs.privKey)
|
||||
if err != nil {
|
||||
rs.Logger.Error(
|
||||
"connect",
|
||||
"err", err,
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
@@ -139,7 +147,7 @@ func (rs *RemoteSigner) handleConnection(conn net.Conn) {
|
||||
req, err := readMsg(conn)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
rs.Logger.Error("handleConnection", "err", err)
|
||||
rs.Logger.Error("handleConnection readMsg", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -148,12 +156,12 @@ func (rs *RemoteSigner) handleConnection(conn net.Conn) {
|
||||
|
||||
if err != nil {
|
||||
// only log the error; we'll reply with an error in res
|
||||
rs.Logger.Error("handleConnection", "err", err)
|
||||
rs.Logger.Error("handleConnection handleRequest", "err", err)
|
||||
}
|
||||
|
||||
err = writeMsg(conn, res)
|
||||
if err != nil {
|
||||
rs.Logger.Error("handleConnection", "err", err)
|
||||
rs.Logger.Error("handleConnection writeMsg", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
184
privval/socket.go
Normal file
184
privval/socket.go
Normal file
@@ -0,0 +1,184 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
p2pconn "github.com/tendermint/tendermint/p2p/conn"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultAcceptDeadlineSeconds = 3
|
||||
defaultConnDeadlineSeconds = 3
|
||||
)
|
||||
|
||||
// timeoutError can be used to check if an error returned from the netp package
|
||||
// was due to a timeout.
|
||||
type timeoutError interface {
|
||||
Timeout() bool
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------
|
||||
// TCP Listener
|
||||
|
||||
// TCPListenerOption sets an optional parameter on the tcpListener.
|
||||
type TCPListenerOption func(*tcpListener)
|
||||
|
||||
// TCPListenerAcceptDeadline sets the deadline for the listener.
|
||||
// A zero time value disables the deadline.
|
||||
func TCPListenerAcceptDeadline(deadline time.Duration) TCPListenerOption {
|
||||
return func(tl *tcpListener) { tl.acceptDeadline = deadline }
|
||||
}
|
||||
|
||||
// TCPListenerConnDeadline sets the read and write deadline for connections
|
||||
// from external signing processes.
|
||||
func TCPListenerConnDeadline(deadline time.Duration) TCPListenerOption {
|
||||
return func(tl *tcpListener) { tl.connDeadline = deadline }
|
||||
}
|
||||
|
||||
// tcpListener implements net.Listener.
|
||||
var _ net.Listener = (*tcpListener)(nil)
|
||||
|
||||
// tcpListener wraps a *net.TCPListener to standardise protocol timeouts
|
||||
// and potentially other tuning parameters. It also returns encrypted connections.
|
||||
type tcpListener struct {
|
||||
*net.TCPListener
|
||||
|
||||
secretConnKey ed25519.PrivKeyEd25519
|
||||
|
||||
acceptDeadline time.Duration
|
||||
connDeadline time.Duration
|
||||
}
|
||||
|
||||
// NewTCPListener returns a listener that accepts authenticated encrypted connections
|
||||
// using the given secretConnKey and the default timeout values.
|
||||
func NewTCPListener(ln net.Listener, secretConnKey ed25519.PrivKeyEd25519) *tcpListener {
|
||||
return &tcpListener{
|
||||
TCPListener: ln.(*net.TCPListener),
|
||||
secretConnKey: secretConnKey,
|
||||
acceptDeadline: time.Second * defaultAcceptDeadlineSeconds,
|
||||
connDeadline: time.Second * defaultConnDeadlineSeconds,
|
||||
}
|
||||
}
|
||||
|
||||
// Accept implements net.Listener.
|
||||
func (ln *tcpListener) Accept() (net.Conn, error) {
|
||||
err := ln.SetDeadline(time.Now().Add(ln.acceptDeadline))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tc, err := ln.AcceptTCP()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wrap the conn in our timeout and encryption wrappers
|
||||
timeoutConn := newTimeoutConn(tc, ln.connDeadline)
|
||||
secretConn, err := p2pconn.MakeSecretConnection(timeoutConn, ln.secretConnKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return secretConn, nil
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------
|
||||
// Unix Listener
|
||||
|
||||
// unixListener implements net.Listener.
|
||||
var _ net.Listener = (*unixListener)(nil)
|
||||
|
||||
type UnixListenerOption func(*unixListener)
|
||||
|
||||
// UnixListenerAcceptDeadline sets the deadline for the listener.
|
||||
// A zero time value disables the deadline.
|
||||
func UnixListenerAcceptDeadline(deadline time.Duration) UnixListenerOption {
|
||||
return func(ul *unixListener) { ul.acceptDeadline = deadline }
|
||||
}
|
||||
|
||||
// UnixListenerConnDeadline sets the read and write deadline for connections
|
||||
// from external signing processes.
|
||||
func UnixListenerConnDeadline(deadline time.Duration) UnixListenerOption {
|
||||
return func(ul *unixListener) { ul.connDeadline = deadline }
|
||||
}
|
||||
|
||||
// unixListener wraps a *net.UnixListener to standardise protocol timeouts
|
||||
// and potentially other tuning parameters. It returns unencrypted connections.
|
||||
type unixListener struct {
|
||||
*net.UnixListener
|
||||
|
||||
acceptDeadline time.Duration
|
||||
connDeadline time.Duration
|
||||
}
|
||||
|
||||
// NewUnixListener returns a listener that accepts unencrypted connections
|
||||
// using the default timeout values.
|
||||
func NewUnixListener(ln net.Listener) *unixListener {
|
||||
return &unixListener{
|
||||
UnixListener: ln.(*net.UnixListener),
|
||||
acceptDeadline: time.Second * defaultAcceptDeadlineSeconds,
|
||||
connDeadline: time.Second * defaultConnDeadlineSeconds,
|
||||
}
|
||||
}
|
||||
|
||||
// Accept implements net.Listener.
|
||||
func (ln *unixListener) Accept() (net.Conn, error) {
|
||||
err := ln.SetDeadline(time.Now().Add(ln.acceptDeadline))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tc, err := ln.AcceptUnix()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wrap the conn in our timeout wrapper
|
||||
conn := newTimeoutConn(tc, ln.connDeadline)
|
||||
|
||||
// TODO: wrap in something that authenticates
|
||||
// with a MAC - https://github.com/tendermint/tendermint/issues/3099
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------
|
||||
// Connection
|
||||
|
||||
// timeoutConn implements net.Conn.
|
||||
var _ net.Conn = (*timeoutConn)(nil)
|
||||
|
||||
// timeoutConn wraps a net.Conn to standardise protocol timeouts / deadline resets.
|
||||
type timeoutConn struct {
|
||||
net.Conn
|
||||
|
||||
connDeadline time.Duration
|
||||
}
|
||||
|
||||
// newTimeoutConn returns an instance of newTCPTimeoutConn.
|
||||
func newTimeoutConn(
|
||||
conn net.Conn,
|
||||
connDeadline time.Duration) *timeoutConn {
|
||||
return &timeoutConn{
|
||||
conn,
|
||||
connDeadline,
|
||||
}
|
||||
}
|
||||
|
||||
// Read implements net.Conn.
|
||||
func (c timeoutConn) Read(b []byte) (n int, err error) {
|
||||
// Reset deadline
|
||||
c.Conn.SetReadDeadline(time.Now().Add(c.connDeadline))
|
||||
|
||||
return c.Conn.Read(b)
|
||||
}
|
||||
|
||||
// Write implements net.Conn.
|
||||
func (c timeoutConn) Write(b []byte) (n int, err error) {
|
||||
// Reset deadline
|
||||
c.Conn.SetWriteDeadline(time.Now().Add(c.connDeadline))
|
||||
|
||||
return c.Conn.Write(b)
|
||||
}
|
@@ -4,17 +4,31 @@ import (
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
)
|
||||
|
||||
func TestTCPTimeoutListenerAcceptDeadline(t *testing.T) {
|
||||
//-------------------------------------------
|
||||
// helper funcs
|
||||
|
||||
func newPrivKey() ed25519.PrivKeyEd25519 {
|
||||
return ed25519.GenPrivKey()
|
||||
}
|
||||
|
||||
//-------------------------------------------
|
||||
// tests
|
||||
|
||||
func TestTCPListenerAcceptDeadline(t *testing.T) {
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ln = newTCPTimeoutListener(ln, time.Millisecond, time.Second, time.Second)
|
||||
tcpLn := NewTCPListener(ln, newPrivKey())
|
||||
TCPListenerAcceptDeadline(time.Millisecond)(tcpLn)
|
||||
TCPListenerConnDeadline(time.Second)(tcpLn)
|
||||
|
||||
_, err = ln.Accept()
|
||||
_, err = tcpLn.Accept()
|
||||
opErr, ok := err.(*net.OpError)
|
||||
if !ok {
|
||||
t.Fatalf("have %v, want *net.OpError", err)
|
||||
@@ -25,14 +39,17 @@ func TestTCPTimeoutListenerAcceptDeadline(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestTCPTimeoutListenerConnDeadline(t *testing.T) {
|
||||
func TestTCPListenerConnDeadline(t *testing.T) {
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ln = newTCPTimeoutListener(ln, time.Second, time.Millisecond, time.Second)
|
||||
tcpLn := NewTCPListener(ln, newPrivKey())
|
||||
TCPListenerAcceptDeadline(time.Second)(tcpLn)
|
||||
TCPListenerConnDeadline(time.Millisecond)(tcpLn)
|
||||
|
||||
readyc := make(chan struct{})
|
||||
donec := make(chan struct{})
|
||||
go func(ln net.Listener) {
|
||||
defer close(donec)
|
||||
@@ -41,6 +58,7 @@ func TestTCPTimeoutListenerConnDeadline(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
<-readyc
|
||||
|
||||
time.Sleep(2 * time.Millisecond)
|
||||
|
||||
@@ -54,12 +72,13 @@ func TestTCPTimeoutListenerConnDeadline(t *testing.T) {
|
||||
if have, want := opErr.Op, "read"; have != want {
|
||||
t.Errorf("have %v, want %v", have, want)
|
||||
}
|
||||
}(ln)
|
||||
}(tcpLn)
|
||||
|
||||
_, err = net.Dial("tcp", ln.Addr().String())
|
||||
dialer := DialTCPFn(ln.Addr().String(), testConnDeadline, newPrivKey())
|
||||
_, err = dialer()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
close(readyc)
|
||||
<-donec
|
||||
}
|
216
privval/tcp.go
216
privval/tcp.go
@@ -1,216 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
p2pconn "github.com/tendermint/tendermint/p2p/conn"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultAcceptDeadlineSeconds = 3
|
||||
defaultConnDeadlineSeconds = 3
|
||||
defaultConnHeartBeatSeconds = 2
|
||||
defaultDialRetries = 10
|
||||
)
|
||||
|
||||
// Socket errors.
|
||||
var (
|
||||
ErrDialRetryMax = errors.New("dialed maximum retries")
|
||||
ErrConnTimeout = errors.New("remote signer timed out")
|
||||
ErrUnexpectedResponse = errors.New("received unexpected response")
|
||||
)
|
||||
|
||||
var (
|
||||
acceptDeadline = time.Second * defaultAcceptDeadlineSeconds
|
||||
connTimeout = time.Second * defaultConnDeadlineSeconds
|
||||
connHeartbeat = time.Second * defaultConnHeartBeatSeconds
|
||||
)
|
||||
|
||||
// TCPValOption sets an optional parameter on the SocketPV.
|
||||
type TCPValOption func(*TCPVal)
|
||||
|
||||
// TCPValAcceptDeadline sets the deadline for the TCPVal listener.
|
||||
// A zero time value disables the deadline.
|
||||
func TCPValAcceptDeadline(deadline time.Duration) TCPValOption {
|
||||
return func(sc *TCPVal) { sc.acceptDeadline = deadline }
|
||||
}
|
||||
|
||||
// TCPValConnTimeout sets the read and write timeout for connections
|
||||
// from external signing processes.
|
||||
func TCPValConnTimeout(timeout time.Duration) TCPValOption {
|
||||
return func(sc *TCPVal) { sc.connTimeout = timeout }
|
||||
}
|
||||
|
||||
// TCPValHeartbeat sets the period on which to check the liveness of the
|
||||
// connected Signer connections.
|
||||
func TCPValHeartbeat(period time.Duration) TCPValOption {
|
||||
return func(sc *TCPVal) { sc.connHeartbeat = period }
|
||||
}
|
||||
|
||||
// TCPVal implements PrivValidator, it uses a socket to request signatures
|
||||
// from an external process.
|
||||
type TCPVal struct {
|
||||
cmn.BaseService
|
||||
*RemoteSignerClient
|
||||
|
||||
addr string
|
||||
acceptDeadline time.Duration
|
||||
connTimeout time.Duration
|
||||
connHeartbeat time.Duration
|
||||
privKey ed25519.PrivKeyEd25519
|
||||
|
||||
conn net.Conn
|
||||
listener net.Listener
|
||||
cancelPing chan struct{}
|
||||
pingTicker *time.Ticker
|
||||
}
|
||||
|
||||
// Check that TCPVal implements PrivValidator.
|
||||
var _ types.PrivValidator = (*TCPVal)(nil)
|
||||
|
||||
// NewTCPVal returns an instance of TCPVal.
|
||||
func NewTCPVal(
|
||||
logger log.Logger,
|
||||
socketAddr string,
|
||||
privKey ed25519.PrivKeyEd25519,
|
||||
) *TCPVal {
|
||||
sc := &TCPVal{
|
||||
addr: socketAddr,
|
||||
acceptDeadline: acceptDeadline,
|
||||
connTimeout: connTimeout,
|
||||
connHeartbeat: connHeartbeat,
|
||||
privKey: privKey,
|
||||
}
|
||||
|
||||
sc.BaseService = *cmn.NewBaseService(logger, "TCPVal", sc)
|
||||
|
||||
return sc
|
||||
}
|
||||
|
||||
// OnStart implements cmn.Service.
|
||||
func (sc *TCPVal) OnStart() error {
|
||||
if err := sc.listen(); err != nil {
|
||||
sc.Logger.Error("OnStart", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
conn, err := sc.waitConnection()
|
||||
if err != nil {
|
||||
sc.Logger.Error("OnStart", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
sc.conn = conn
|
||||
sc.RemoteSignerClient, err = NewRemoteSignerClient(sc.conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start a routine to keep the connection alive
|
||||
sc.cancelPing = make(chan struct{}, 1)
|
||||
sc.pingTicker = time.NewTicker(sc.connHeartbeat)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-sc.pingTicker.C:
|
||||
err := sc.Ping()
|
||||
if err != nil {
|
||||
sc.Logger.Error(
|
||||
"Ping",
|
||||
"err", err,
|
||||
)
|
||||
}
|
||||
case <-sc.cancelPing:
|
||||
sc.pingTicker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements cmn.Service.
|
||||
func (sc *TCPVal) OnStop() {
|
||||
if sc.cancelPing != nil {
|
||||
close(sc.cancelPing)
|
||||
}
|
||||
|
||||
if sc.conn != nil {
|
||||
if err := sc.conn.Close(); err != nil {
|
||||
sc.Logger.Error("OnStop", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if sc.listener != nil {
|
||||
if err := sc.listener.Close(); err != nil {
|
||||
sc.Logger.Error("OnStop", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sc *TCPVal) acceptConnection() (net.Conn, error) {
|
||||
conn, err := sc.listener.Accept()
|
||||
if err != nil {
|
||||
if !sc.IsRunning() {
|
||||
return nil, nil // Ignore error from listener closing.
|
||||
}
|
||||
return nil, err
|
||||
|
||||
}
|
||||
|
||||
conn, err = p2pconn.MakeSecretConnection(conn, sc.privKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (sc *TCPVal) listen() error {
|
||||
ln, err := net.Listen(cmn.ProtocolAndAddress(sc.addr))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sc.listener = newTCPTimeoutListener(
|
||||
ln,
|
||||
sc.acceptDeadline,
|
||||
sc.connTimeout,
|
||||
sc.connHeartbeat,
|
||||
)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// waitConnection uses the configured wait timeout to error if no external
|
||||
// process connects in the time period.
|
||||
func (sc *TCPVal) waitConnection() (net.Conn, error) {
|
||||
var (
|
||||
connc = make(chan net.Conn, 1)
|
||||
errc = make(chan error, 1)
|
||||
)
|
||||
|
||||
go func(connc chan<- net.Conn, errc chan<- error) {
|
||||
conn, err := sc.acceptConnection()
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
|
||||
connc <- conn
|
||||
}(connc, errc)
|
||||
|
||||
select {
|
||||
case conn := <-connc:
|
||||
return conn, nil
|
||||
case err := <-errc:
|
||||
return nil, err
|
||||
}
|
||||
}
|
@@ -1,90 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
// timeoutError can be used to check if an error returned from the netp package
|
||||
// was due to a timeout.
|
||||
type timeoutError interface {
|
||||
Timeout() bool
|
||||
}
|
||||
|
||||
// tcpTimeoutListener implements net.Listener.
|
||||
var _ net.Listener = (*tcpTimeoutListener)(nil)
|
||||
|
||||
// tcpTimeoutListener wraps a *net.TCPListener to standardise protocol timeouts
|
||||
// and potentially other tuning parameters.
|
||||
type tcpTimeoutListener struct {
|
||||
*net.TCPListener
|
||||
|
||||
acceptDeadline time.Duration
|
||||
connDeadline time.Duration
|
||||
period time.Duration
|
||||
}
|
||||
|
||||
// timeoutConn wraps a net.Conn to standardise protocol timeouts / deadline resets.
|
||||
type timeoutConn struct {
|
||||
net.Conn
|
||||
|
||||
connDeadline time.Duration
|
||||
}
|
||||
|
||||
// newTCPTimeoutListener returns an instance of tcpTimeoutListener.
|
||||
func newTCPTimeoutListener(
|
||||
ln net.Listener,
|
||||
acceptDeadline, connDeadline time.Duration,
|
||||
period time.Duration,
|
||||
) tcpTimeoutListener {
|
||||
return tcpTimeoutListener{
|
||||
TCPListener: ln.(*net.TCPListener),
|
||||
acceptDeadline: acceptDeadline,
|
||||
connDeadline: connDeadline,
|
||||
period: period,
|
||||
}
|
||||
}
|
||||
|
||||
// newTimeoutConn returns an instance of newTCPTimeoutConn.
|
||||
func newTimeoutConn(
|
||||
conn net.Conn,
|
||||
connDeadline time.Duration) *timeoutConn {
|
||||
return &timeoutConn{
|
||||
conn,
|
||||
connDeadline,
|
||||
}
|
||||
}
|
||||
|
||||
// Accept implements net.Listener.
|
||||
func (ln tcpTimeoutListener) Accept() (net.Conn, error) {
|
||||
err := ln.SetDeadline(time.Now().Add(ln.acceptDeadline))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tc, err := ln.AcceptTCP()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Wrap the conn in our timeout wrapper
|
||||
conn := newTimeoutConn(tc, ln.connDeadline)
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
// Read implements net.Listener.
|
||||
func (c timeoutConn) Read(b []byte) (n int, err error) {
|
||||
// Reset deadline
|
||||
c.Conn.SetReadDeadline(time.Now().Add(c.connDeadline))
|
||||
|
||||
return c.Conn.Read(b)
|
||||
}
|
||||
|
||||
// Write implements net.Listener.
|
||||
func (c timeoutConn) Write(b []byte) (n int, err error) {
|
||||
// Reset deadline
|
||||
c.Conn.SetWriteDeadline(time.Now().Add(c.connDeadline))
|
||||
|
||||
return c.Conn.Write(b)
|
||||
}
|
@@ -428,5 +428,10 @@ func TestTxSearch(t *testing.T) {
|
||||
if len(result.Txs) == 0 {
|
||||
t.Fatal("expected a lot of transactions")
|
||||
}
|
||||
|
||||
// query a non existing tx with page 1 and txsPerPage 1
|
||||
result, err = c.TxSearch("app.creator='Cosmoshi Neetowoko'", true, 1, 1)
|
||||
require.Nil(t, err, "%+v", err)
|
||||
require.Len(t, result.Txs, 0)
|
||||
}
|
||||
}
|
||||
|
@@ -53,6 +53,7 @@ func NetInfo() (*ctypes.ResultNetInfo, error) {
|
||||
NodeInfo: nodeInfo,
|
||||
IsOutbound: peer.IsOutbound(),
|
||||
ConnectionStatus: peer.Status(),
|
||||
RemoteIP: peer.RemoteIP(),
|
||||
})
|
||||
}
|
||||
// TODO: Should we include PersistentPeers and Seeds in here?
|
||||
|
@@ -154,3 +154,12 @@ func validatePerPage(perPage int) int {
|
||||
}
|
||||
return perPage
|
||||
}
|
||||
|
||||
func validateSkipCount(page, perPage int) int {
|
||||
skipCount := (page - 1) * perPage
|
||||
if skipCount < 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
return skipCount
|
||||
}
|
||||
|
@@ -201,10 +201,11 @@ func TxSearch(query string, prove bool, page, perPage int) (*ctypes.ResultTxSear
|
||||
totalCount := len(results)
|
||||
perPage = validatePerPage(perPage)
|
||||
page = validatePage(page, perPage, totalCount)
|
||||
skipCount := (page - 1) * perPage
|
||||
skipCount := validateSkipCount(page, perPage)
|
||||
|
||||
apiResults := make([]*ctypes.ResultTx, cmn.MinInt(perPage, totalCount-skipCount))
|
||||
var proof types.TxProof
|
||||
// if there's no tx in the results array, we don't need to loop through the apiResults array
|
||||
for i := 0; i < len(apiResults); i++ {
|
||||
r := results[skipCount+i]
|
||||
height := r.Height
|
||||
|
@@ -2,6 +2,7 @@ package core_types
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
@@ -110,6 +111,7 @@ type Peer struct {
|
||||
NodeInfo p2p.DefaultNodeInfo `json:"node_info"`
|
||||
IsOutbound bool `json:"is_outbound"`
|
||||
ConnectionStatus p2p.ConnectionStatus `json:"connection_status"`
|
||||
RemoteIP net.IP `json:"remote_ip"`
|
||||
}
|
||||
|
||||
// Validators for a height
|
||||
|
@@ -62,7 +62,7 @@ func TestEvidence(t *testing.T) {
|
||||
{vote1, makeVote(val, chainID, 0, 10, 3, 1, blockID2), false}, // wrong round
|
||||
{vote1, makeVote(val, chainID, 0, 10, 2, 2, blockID2), false}, // wrong step
|
||||
{vote1, makeVote(val2, chainID, 0, 10, 2, 1, blockID), false}, // wrong validator
|
||||
{vote1, badVote, false}, // signed by wrong key
|
||||
{vote1, badVote, false}, // signed by wrong key
|
||||
}
|
||||
|
||||
pubKey := val.GetPubKey()
|
||||
|
Reference in New Issue
Block a user