mirror of
https://github.com/fluencelabs/tendermint
synced 2025-07-31 04:01:55 +00:00
Compare commits
26 Commits
anton/3853
...
anton/chan
Author | SHA1 | Date | |
---|---|---|---|
|
a507d2a019 | ||
|
64e38e279e | ||
|
0cf8812b17 | ||
|
f7f034a8be | ||
|
4da3de79a7 | ||
|
f5b116c687 | ||
|
be06316c84 | ||
|
8ba8497ac8 | ||
|
23fa2e1f1b | ||
|
ba9cdeaed9 | ||
|
8ed1400949 | ||
|
dbf4062acd | ||
|
79e924b34f | ||
|
0c9a284f8d | ||
|
245e1c9ef7 | ||
|
470f1efcc8 | ||
|
4e3b6bfb26 | ||
|
7a86e49312 | ||
|
744d65f173 | ||
|
1b69c6b56b | ||
|
e5084a4787 | ||
|
0787b79347 | ||
|
823d916a11 | ||
|
e8926867d8 | ||
|
0f076e5fbe | ||
|
ac232caef3 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -43,3 +43,5 @@ terraform.tfstate.backup
|
||||
terraform.tfstate.d
|
||||
|
||||
.vscode
|
||||
|
||||
profile\.out
|
||||
|
42
CHANGELOG.md
42
CHANGELOG.md
@@ -1,10 +1,48 @@
|
||||
# Changelog
|
||||
|
||||
## v0.32.2
|
||||
|
||||
*July 31, 2019*
|
||||
|
||||
Special thanks to external contributors on this release:
|
||||
@ruseinov, @bluele, @guagualvcha
|
||||
|
||||
Friendly reminder, we have a [bug bounty
|
||||
program](https://hackerone.com/tendermint).
|
||||
|
||||
### BREAKING CHANGES:
|
||||
|
||||
- Go API
|
||||
- [libs] [\#3811](https://github.com/tendermint/tendermint/issues/3811) Remove `db` from libs in favor of `https://github.com/tendermint/tm-db`
|
||||
|
||||
### FEATURES:
|
||||
|
||||
- [node] [\#3846](https://github.com/tendermint/tendermint/pull/3846) Allow replacing existing p2p.Reactor(s) using [`CustomReactors`
|
||||
option](https://godoc.org/github.com/tendermint/tendermint/node#CustomReactors).
|
||||
Warning: beware of accidental name clashes. Here is the list of existing
|
||||
reactors: MEMPOOL, BLOCKCHAIN, CONSENSUS, EVIDENCE, PEX.
|
||||
- [p2p] [\#3834](https://github.com/tendermint/tendermint/issues/3834) Do not write 'Couldn't connect to any seeds' error log if there are no seeds in config file
|
||||
- [rpc] [\#3818](https://github.com/tendermint/tendermint/issues/3818) Make `max_body_bytes` and `max_header_bytes` configurable(@bluele)
|
||||
- [mempool] [\#3826](https://github.com/tendermint/tendermint/issues/3826) Make `max_msg_bytes` configurable(@bluele)
|
||||
- [blockchain] [\#3561](https://github.com/tendermint/tendermint/issues/3561) Add early version of the new blockchain reactor, which is supposed to be more modular and testable compared to the old version. To try it, you'll have to change `version` in the config file, [here](https://github.com/tendermint/tendermint/blob/master/config/toml.go#L303) NOTE: It's not ready for a production yet. For further information, see [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md) & [ADR-43](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-043-blockchain-riri-org.md)
|
||||
|
||||
### IMPROVEMENTS:
|
||||
|
||||
- [abci] [\#3809](https://github.com/tendermint/tendermint/issues/3809) Recover from application panics in `server/socket_server.go` to allow socket cleanup (@ruseinov)
|
||||
- [rpc] [\#2252](https://github.com/tendermint/tendermint/issues/2252) Add `/broadcast_evidence` endpoint to submit double signing and other types of evidence
|
||||
- [p2p] [\#3664](https://github.com/tendermint/tendermint/issues/3664) p2p/conn: reuse buffer when write/read from secret connection(@guagualvcha)
|
||||
- [rpc] [\#3076](https://github.com/tendermint/tendermint/issues/3076) Improve transaction search performance
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [p2p] [\#3644](https://github.com/tendermint/tendermint/issues/3644) Fix error logging for connection stop (@defunctzombie)
|
||||
- [rpc] [\#3813](https://github.com/tendermint/tendermint/issues/3813) Return err if page is incorrect (less than 0 or greater than total pages)
|
||||
|
||||
## v0.32.1
|
||||
|
||||
*July 15, 2019*
|
||||
|
||||
Special thanks to external contributors on this release:
|
||||
Special thanks to external contributors on this release:
|
||||
@ParthDesai, @climber73, @jim380, @ashleyvega
|
||||
|
||||
This release contains a minor enhancement to the ABCI and some breaking changes to our libs folder, namely:
|
||||
@@ -26,7 +64,7 @@ program](https://hackerone.com/tendermint).
|
||||
|
||||
### FEATURES:
|
||||
|
||||
- [node] Add variadic argument to `NewNode` to support functional options, allowing the Node to be more easily customized.
|
||||
- [node] Add variadic argument to `NewNode` to support functional options, allowing the Node to be more easily customized.
|
||||
- [node][\#3730](https://github.com/tendermint/tendermint/pull/3730) Add `CustomReactors` option to `NewNode` allowing caller to pass
|
||||
custom reactors to run inside Tendermint node (@ParthDesai)
|
||||
- [abci] [\#2127](https://github.com/tendermint/tendermint/issues/2127)RequestCheckTx has a new field, `CheckTxType`, which can take values of `CheckTxType_New` and `CheckTxType_Recheck`, indicating whether this is a new tx being checked for the first time or whether this tx is being rechecked after a block commit. This allows applications to skip certain expensive operations, like signature checking, if they've already been done once. see [docs](https://github.com/tendermint/tendermint/blob/eddb433d7c082efbeaf8974413a36641519ee895/docs/spec/abci/apps.md#mempool-connection)
|
||||
|
@@ -1,4 +1,4 @@
|
||||
## v0.32.2
|
||||
## v0.32.3
|
||||
|
||||
\*\*
|
||||
|
||||
@@ -14,27 +14,14 @@ program](https://hackerone.com/tendermint).
|
||||
- Apps
|
||||
|
||||
- Go API
|
||||
- [libs] \#3811 Remove `db` from libs in favor of `https://github.com/tendermint/tm-db`
|
||||
|
||||
### FEATURES:
|
||||
|
||||
- [node] Allow replacing existing p2p.Reactor(s) using [`CustomReactors`
|
||||
option](https://godoc.org/github.com/tendermint/tendermint/node#CustomReactors).
|
||||
Warning: beware of accidental name clashes. Here is the list of existing
|
||||
reactors: MEMPOOL, BLOCKCHAIN, CONSENSUS, EVIDENCE, PEX.
|
||||
|
||||
### IMPROVEMENTS:
|
||||
|
||||
- [p2p] \#3834 Do not write 'Couldn't connect to any seeds' error log if there are no seeds in config file
|
||||
- [abci] \#3809 Recover from application panics in `server/socket_server.go` to allow socket cleanup (@ruseinov)
|
||||
- [rpc] \#2252 Add `/broadcast_evidence` endpoint to submit double signing and other types of evidence
|
||||
- [rpc] \#3818 Make `max_body_bytes` and `max_header_bytes` configurable
|
||||
- [p2p] \#3664 p2p/conn: reuse buffer when write/read from secret connection
|
||||
- [mempool] \#3826 Make `max_msg_bytes` configurable
|
||||
- [blockchain] \#3561 Add early version of the new blockchain reactor, which is supposed to be more modular and testable compared to the old version. To try it, you'll have to change `version` in the config file, [here](https://github.com/tendermint/tendermint/blob/master/config/toml.go#L303) NOTE: It's not ready for a production yet. For further information, see [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md) & [ADR-43](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-043-blockchain-riri-org.md)
|
||||
- [rpc] \#3076 Improve transaction search performance
|
||||
- [consensus] \#3839 Reduce "Error attempting to add vote" message severity (Error -> Info)
|
||||
- [privval] \#3370 Refactors and simplifies validator/kms connection handling. Please refer to [this comment](https://github.com/tendermint/tendermint/pull/3370#issue-257360971) for details.
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [p2p][\#3644](https://github.com/tendermint/tendermint/pull/3644) Fix error logging for connection stop (@defunctzombie)
|
||||
- [rpc] \#3813 Return err if page is incorrect (less than 0 or greater than total pages)
|
||||
- [config] \#3868 move misplaced `max_msg_bytes` into mempool section
|
||||
|
387
blockchain/v2/schedule.go
Normal file
387
blockchain/v2/schedule.go
Normal file
@@ -0,0 +1,387 @@
|
||||
// nolint:unused
|
||||
package v2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
type Event interface{}
|
||||
|
||||
type blockState int
|
||||
|
||||
const (
|
||||
blockStateUnknown blockState = iota
|
||||
blockStateNew
|
||||
blockStatePending
|
||||
blockStateReceived
|
||||
blockStateProcessed
|
||||
)
|
||||
|
||||
func (e blockState) String() string {
|
||||
switch e {
|
||||
case blockStateUnknown:
|
||||
return "Unknown"
|
||||
case blockStateNew:
|
||||
return "New"
|
||||
case blockStatePending:
|
||||
return "Pending"
|
||||
case blockStateReceived:
|
||||
return "Received"
|
||||
case blockStateProcessed:
|
||||
return "Processed"
|
||||
default:
|
||||
return fmt.Sprintf("unknown blockState: %d", e)
|
||||
}
|
||||
}
|
||||
|
||||
type peerState int
|
||||
|
||||
const (
|
||||
peerStateNew = iota
|
||||
peerStateReady
|
||||
peerStateRemoved
|
||||
)
|
||||
|
||||
func (e peerState) String() string {
|
||||
switch e {
|
||||
case peerStateNew:
|
||||
return "New"
|
||||
case peerStateReady:
|
||||
return "Ready"
|
||||
case peerStateRemoved:
|
||||
return "Removed"
|
||||
default:
|
||||
return fmt.Sprintf("unknown peerState: %d", e)
|
||||
}
|
||||
}
|
||||
|
||||
type scPeer struct {
|
||||
peerID p2p.ID
|
||||
state peerState
|
||||
height int64
|
||||
lastTouched time.Time
|
||||
lastRate int64
|
||||
}
|
||||
|
||||
func newScPeer(peerID p2p.ID) *scPeer {
|
||||
return &scPeer{
|
||||
peerID: peerID,
|
||||
state: peerStateNew,
|
||||
height: -1,
|
||||
lastTouched: time.Time{},
|
||||
}
|
||||
}
|
||||
|
||||
// The schedule is a composite data structure which allows a scheduler to keep
|
||||
// track of which blocks have been scheduled into which state.
|
||||
type schedule struct {
|
||||
initHeight int64
|
||||
// a list of blocks in which blockState
|
||||
blockStates map[int64]blockState
|
||||
|
||||
// a map of peerID to schedule specific peer struct `scPeer` used to keep
|
||||
// track of peer specific state
|
||||
peers map[p2p.ID]*scPeer
|
||||
|
||||
// a map of heights to the peer we are waiting for a response from
|
||||
pendingBlocks map[int64]p2p.ID
|
||||
|
||||
// the time at which a block was put in blockStatePending
|
||||
pendingTime map[int64]time.Time
|
||||
|
||||
// the peerID of the peer which put the block in blockStateReceived
|
||||
receivedBlocks map[int64]p2p.ID
|
||||
}
|
||||
|
||||
func newSchedule(initHeight int64) *schedule {
|
||||
sc := schedule{
|
||||
initHeight: initHeight,
|
||||
blockStates: make(map[int64]blockState),
|
||||
peers: make(map[p2p.ID]*scPeer),
|
||||
pendingBlocks: make(map[int64]p2p.ID),
|
||||
pendingTime: make(map[int64]time.Time),
|
||||
receivedBlocks: make(map[int64]p2p.ID),
|
||||
}
|
||||
|
||||
sc.setStateAtHeight(initHeight, blockStateNew)
|
||||
|
||||
return &sc
|
||||
}
|
||||
|
||||
func (sc *schedule) addPeer(peerID p2p.ID) error {
|
||||
if _, ok := sc.peers[peerID]; ok {
|
||||
return fmt.Errorf("Cannot add duplicate peer %s", peerID)
|
||||
}
|
||||
sc.peers[peerID] = newScPeer(peerID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error {
|
||||
peer, ok := sc.peers[peerID]
|
||||
if !ok {
|
||||
return fmt.Errorf("Couldn't find peer %s", peerID)
|
||||
}
|
||||
|
||||
if peer.state == peerStateRemoved {
|
||||
return fmt.Errorf("Tried to touch peer in peerStateRemoved")
|
||||
}
|
||||
|
||||
peer.lastTouched = time
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sc *schedule) removePeer(peerID p2p.ID) error {
|
||||
peer, ok := sc.peers[peerID]
|
||||
if !ok {
|
||||
return fmt.Errorf("Couldn't find peer %s", peerID)
|
||||
}
|
||||
|
||||
if peer.state == peerStateRemoved {
|
||||
return fmt.Errorf("Tried to remove peer %s in peerStateRemoved", peerID)
|
||||
}
|
||||
|
||||
for height, pendingPeerID := range sc.pendingBlocks {
|
||||
if pendingPeerID == peerID {
|
||||
sc.setStateAtHeight(height, blockStateNew)
|
||||
delete(sc.pendingTime, height)
|
||||
delete(sc.pendingBlocks, height)
|
||||
}
|
||||
}
|
||||
|
||||
for height, rcvPeerID := range sc.receivedBlocks {
|
||||
if rcvPeerID == peerID {
|
||||
sc.setStateAtHeight(height, blockStateNew)
|
||||
delete(sc.receivedBlocks, height)
|
||||
}
|
||||
}
|
||||
|
||||
peer.state = peerStateRemoved
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error {
|
||||
peer, ok := sc.peers[peerID]
|
||||
if !ok {
|
||||
return fmt.Errorf("Can't find peer %s", peerID)
|
||||
}
|
||||
|
||||
if peer.state == peerStateRemoved {
|
||||
return fmt.Errorf("Cannot set peer height for a peer in peerStateRemoved")
|
||||
}
|
||||
|
||||
if height < peer.height {
|
||||
return fmt.Errorf("Cannot move peer height lower. from %d to %d", peer.height, height)
|
||||
}
|
||||
|
||||
peer.height = height
|
||||
peer.state = peerStateReady
|
||||
for i := sc.minHeight(); i <= height; i++ {
|
||||
if sc.getStateAtHeight(i) == blockStateUnknown {
|
||||
sc.setStateAtHeight(i, blockStateNew)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sc *schedule) getStateAtHeight(height int64) blockState {
|
||||
if height < sc.initHeight {
|
||||
return blockStateProcessed
|
||||
} else if state, ok := sc.blockStates[height]; ok {
|
||||
return state
|
||||
} else {
|
||||
return blockStateUnknown
|
||||
}
|
||||
}
|
||||
|
||||
func (sc *schedule) getPeersAtHeight(height int64) []*scPeer {
|
||||
peers := []*scPeer{}
|
||||
for _, peer := range sc.peers {
|
||||
if peer.height >= height {
|
||||
peers = append(peers, peer)
|
||||
}
|
||||
}
|
||||
|
||||
return peers
|
||||
}
|
||||
|
||||
func (sc *schedule) peersInactiveSince(duration time.Duration, now time.Time) []p2p.ID {
|
||||
peers := []p2p.ID{}
|
||||
for _, peer := range sc.peers {
|
||||
if now.Sub(peer.lastTouched) > duration {
|
||||
peers = append(peers, peer.peerID)
|
||||
}
|
||||
}
|
||||
|
||||
return peers
|
||||
}
|
||||
|
||||
func (sc *schedule) peersSlowerThan(minSpeed int64) []p2p.ID {
|
||||
peers := []p2p.ID{}
|
||||
for _, peer := range sc.peers {
|
||||
if peer.lastRate < minSpeed {
|
||||
peers = append(peers, peer.peerID)
|
||||
}
|
||||
}
|
||||
|
||||
return peers
|
||||
}
|
||||
|
||||
func (sc *schedule) setStateAtHeight(height int64, state blockState) {
|
||||
sc.blockStates[height] = state
|
||||
}
|
||||
|
||||
func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error {
|
||||
peer, ok := sc.peers[peerID]
|
||||
if !ok {
|
||||
return fmt.Errorf("Can't find peer %s", peerID)
|
||||
}
|
||||
|
||||
if peer.state == peerStateRemoved {
|
||||
return fmt.Errorf("Cannot receive blocks from removed peer %s", peerID)
|
||||
}
|
||||
|
||||
if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID {
|
||||
return fmt.Errorf("Received block %d from peer %s without being requested", height, peerID)
|
||||
}
|
||||
|
||||
pendingTime, ok := sc.pendingTime[height]
|
||||
if !ok || now.Sub(pendingTime) <= 0 {
|
||||
return fmt.Errorf("Clock error. Block %d received at %s but requested at %s",
|
||||
height, pendingTime, now)
|
||||
}
|
||||
|
||||
peer.lastRate = size / int64(now.Sub(pendingTime).Seconds())
|
||||
|
||||
sc.setStateAtHeight(height, blockStateReceived)
|
||||
delete(sc.pendingBlocks, height)
|
||||
delete(sc.pendingTime, height)
|
||||
|
||||
sc.receivedBlocks[height] = peerID
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) error {
|
||||
peer, ok := sc.peers[peerID]
|
||||
if !ok {
|
||||
return fmt.Errorf("Can't find peer %s", peerID)
|
||||
}
|
||||
|
||||
state := sc.getStateAtHeight(height)
|
||||
if state != blockStateNew {
|
||||
return fmt.Errorf("Block %d should be in blockStateNew but was %s", height, state)
|
||||
}
|
||||
|
||||
if peer.state != peerStateReady {
|
||||
return fmt.Errorf("Cannot schedule %d from %s in %s", height, peerID, peer.state)
|
||||
}
|
||||
|
||||
if height > peer.height {
|
||||
return fmt.Errorf("Cannot request height %d from peer %s who is at height %d",
|
||||
height, peerID, peer.height)
|
||||
}
|
||||
|
||||
sc.setStateAtHeight(height, blockStatePending)
|
||||
sc.pendingBlocks[height] = peerID
|
||||
// XXX: to make this more accurate we can introduce a message from
|
||||
// the IO routine which indicates the time the request was put on the wire
|
||||
sc.pendingTime[height] = time
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sc *schedule) markProcessed(height int64) error {
|
||||
state := sc.getStateAtHeight(height)
|
||||
if state != blockStateReceived {
|
||||
return fmt.Errorf("Can't mark height %d received from block state %s", height, state)
|
||||
}
|
||||
|
||||
delete(sc.receivedBlocks, height)
|
||||
|
||||
sc.setStateAtHeight(height, blockStateProcessed)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// allBlockProcessed returns true if all blocks are in blockStateProcessed and
|
||||
// determines if the schedule has been completed
|
||||
func (sc *schedule) allBlocksProcessed() bool {
|
||||
for _, state := range sc.blockStates {
|
||||
if state != blockStateProcessed {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// highest block | state == blockStateNew
|
||||
func (sc *schedule) maxHeight() int64 {
|
||||
var max int64 = 0
|
||||
for height, state := range sc.blockStates {
|
||||
if state == blockStateNew && height > max {
|
||||
max = height
|
||||
}
|
||||
}
|
||||
|
||||
return max
|
||||
}
|
||||
|
||||
// lowest block | state == blockStateNew
|
||||
func (sc *schedule) minHeight() int64 {
|
||||
var min int64 = math.MaxInt64
|
||||
for height, state := range sc.blockStates {
|
||||
if state == blockStateNew && height < min {
|
||||
min = height
|
||||
}
|
||||
}
|
||||
|
||||
return min
|
||||
}
|
||||
|
||||
func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 {
|
||||
heights := []int64{}
|
||||
for height, pendingPeerID := range sc.pendingBlocks {
|
||||
if pendingPeerID == peerID {
|
||||
heights = append(heights, height)
|
||||
}
|
||||
}
|
||||
return heights
|
||||
}
|
||||
|
||||
func (sc *schedule) selectPeer(peers []*scPeer) *scPeer {
|
||||
// FIXME: properPeerSelector
|
||||
s := rand.NewSource(time.Now().Unix())
|
||||
r := rand.New(s)
|
||||
|
||||
return peers[r.Intn(len(peers))]
|
||||
}
|
||||
|
||||
// XXX: this duplicates the logic of peersInactiveSince and peersSlowerThan
|
||||
func (sc *schedule) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []p2p.ID {
|
||||
prunable := []p2p.ID{}
|
||||
for peerID, peer := range sc.peers {
|
||||
if now.Sub(peer.lastTouched) > peerTimout || peer.lastRate < minRecvRate {
|
||||
prunable = append(prunable, peerID)
|
||||
}
|
||||
}
|
||||
|
||||
return prunable
|
||||
}
|
||||
|
||||
func (sc *schedule) numBlockInState(targetState blockState) uint32 {
|
||||
var num uint32 = 0
|
||||
for _, state := range sc.blockStates {
|
||||
if state == targetState {
|
||||
num++
|
||||
}
|
||||
}
|
||||
return num
|
||||
}
|
272
blockchain/v2/schedule_test.go
Normal file
272
blockchain/v2/schedule_test.go
Normal file
@@ -0,0 +1,272 @@
|
||||
package v2
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
)
|
||||
|
||||
func TestScheduleInit(t *testing.T) {
|
||||
var (
|
||||
initHeight int64 = 5
|
||||
sc = newSchedule(initHeight)
|
||||
)
|
||||
|
||||
assert.Equal(t, blockStateNew, sc.getStateAtHeight(initHeight))
|
||||
assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(initHeight-1))
|
||||
assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(initHeight+1))
|
||||
}
|
||||
|
||||
func TestAddPeer(t *testing.T) {
|
||||
var (
|
||||
initHeight int64 = 5
|
||||
peerID p2p.ID = "1"
|
||||
peerIDTwo p2p.ID = "2"
|
||||
sc = newSchedule(initHeight)
|
||||
)
|
||||
|
||||
assert.Nil(t, sc.addPeer(peerID))
|
||||
assert.Nil(t, sc.addPeer(peerIDTwo))
|
||||
assert.Error(t, sc.addPeer(peerID))
|
||||
}
|
||||
|
||||
func TestTouchPeer(t *testing.T) {
|
||||
var (
|
||||
initHeight int64 = 5
|
||||
peerID p2p.ID = "1"
|
||||
sc = newSchedule(initHeight)
|
||||
now = time.Now()
|
||||
)
|
||||
|
||||
assert.Error(t, sc.touchPeer(peerID, now),
|
||||
"Touching an unknown peer should return errPeerNotFound")
|
||||
|
||||
assert.Nil(t, sc.addPeer(peerID),
|
||||
"Adding a peer should return no error")
|
||||
assert.Nil(t, sc.touchPeer(peerID, now),
|
||||
"Touching a peer should return no error")
|
||||
|
||||
threshold := 10 * time.Second
|
||||
assert.Empty(t, sc.peersInactiveSince(threshold, now.Add(9*time.Second)),
|
||||
"Expected no peers to have been touched over 9 seconds")
|
||||
assert.Containsf(t, sc.peersInactiveSince(threshold, now.Add(11*time.Second)), peerID,
|
||||
"Expected one %s to have been touched over 10 seconds ago", peerID)
|
||||
}
|
||||
|
||||
func TestPeerHeight(t *testing.T) {
|
||||
var (
|
||||
initHeight int64 = 5
|
||||
peerID p2p.ID = "1"
|
||||
peerHeight int64 = 20
|
||||
sc = newSchedule(initHeight)
|
||||
)
|
||||
|
||||
assert.NoError(t, sc.addPeer(peerID),
|
||||
"Adding a peer should return no error")
|
||||
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight))
|
||||
for i := initHeight; i <= peerHeight; i++ {
|
||||
assert.Equal(t, sc.getStateAtHeight(i), blockStateNew,
|
||||
"Expected all blocks to be in blockStateNew")
|
||||
peerIDs := []p2p.ID{}
|
||||
for _, peer := range sc.getPeersAtHeight(i) {
|
||||
peerIDs = append(peerIDs, peer.peerID)
|
||||
}
|
||||
|
||||
assert.Containsf(t, peerIDs, peerID,
|
||||
"Expected %s to have block %d", peerID, i)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTransitionPending(t *testing.T) {
|
||||
var (
|
||||
initHeight int64 = 5
|
||||
peerID p2p.ID = "1"
|
||||
peerIDTwo p2p.ID = "2"
|
||||
peerHeight int64 = 20
|
||||
sc = newSchedule(initHeight)
|
||||
now = time.Now()
|
||||
)
|
||||
|
||||
assert.NoError(t, sc.addPeer(peerID),
|
||||
"Adding a peer should return no error")
|
||||
assert.Nil(t, sc.addPeer(peerIDTwo),
|
||||
"Adding a peer should return no error")
|
||||
|
||||
assert.Error(t, sc.markPending(peerID, peerHeight, now),
|
||||
"Expected scheduling a block from a peer in peerStateNew to fail")
|
||||
|
||||
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight),
|
||||
"Expected setPeerHeight to return no error")
|
||||
assert.NoError(t, sc.setPeerHeight(peerIDTwo, peerHeight),
|
||||
"Expected setPeerHeight to return no error")
|
||||
|
||||
assert.NoError(t, sc.markPending(peerID, peerHeight, now),
|
||||
"Expected markingPending new block to succeed")
|
||||
assert.Error(t, sc.markPending(peerIDTwo, peerHeight, now),
|
||||
"Expected markingPending by a second peer to fail")
|
||||
|
||||
assert.Equal(t, blockStatePending, sc.getStateAtHeight(peerHeight),
|
||||
"Expected the block to to be in blockStatePending")
|
||||
|
||||
assert.NoError(t, sc.removePeer(peerID),
|
||||
"Expected removePeer to return no error")
|
||||
|
||||
assert.Equal(t, blockStateNew, sc.getStateAtHeight(peerHeight),
|
||||
"Expected the block to to be in blockStateNew")
|
||||
|
||||
assert.Error(t, sc.markPending(peerID, peerHeight, now),
|
||||
"Expected markingPending removed peer to fail")
|
||||
|
||||
assert.NoError(t, sc.markPending(peerIDTwo, peerHeight, now),
|
||||
"Expected markingPending on a ready peer to succeed")
|
||||
|
||||
assert.Equal(t, blockStatePending, sc.getStateAtHeight(peerHeight),
|
||||
"Expected the block to to be in blockStatePending")
|
||||
}
|
||||
|
||||
func TestTransitionReceived(t *testing.T) {
|
||||
var (
|
||||
initHeight int64 = 5
|
||||
peerID p2p.ID = "1"
|
||||
peerIDTwo p2p.ID = "2"
|
||||
peerHeight int64 = 20
|
||||
blockSize int64 = 1024
|
||||
sc = newSchedule(initHeight)
|
||||
now = time.Now()
|
||||
receivedAt = now.Add(1 * time.Second)
|
||||
)
|
||||
|
||||
assert.NoError(t, sc.addPeer(peerID),
|
||||
"Expected adding peer %s to succeed", peerID)
|
||||
assert.NoError(t, sc.addPeer(peerIDTwo),
|
||||
"Expected adding peer %s to succeed", peerIDTwo)
|
||||
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight),
|
||||
"Expected setPeerHeight to return no error")
|
||||
assert.NoErrorf(t, sc.setPeerHeight(peerIDTwo, peerHeight),
|
||||
"Expected setPeerHeight on %s to %d to succeed", peerIDTwo, peerHeight)
|
||||
assert.NoError(t, sc.markPending(peerID, initHeight, now),
|
||||
"Expected markingPending new block to succeed")
|
||||
|
||||
assert.Error(t, sc.markReceived(peerIDTwo, initHeight, blockSize, receivedAt),
|
||||
"Expected marking markReceived from a non requesting peer to fail")
|
||||
|
||||
assert.NoError(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt),
|
||||
"Expected marking markReceived on a pending block to succeed")
|
||||
|
||||
assert.Error(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt),
|
||||
"Expected marking markReceived on received block to fail")
|
||||
|
||||
assert.Equalf(t, blockStateReceived, sc.getStateAtHeight(initHeight),
|
||||
"Expected block %d to be blockHeightReceived", initHeight)
|
||||
|
||||
assert.NoErrorf(t, sc.removePeer(peerID),
|
||||
"Expected removePeer removing %s to succeed", peerID)
|
||||
|
||||
assert.Equalf(t, blockStateNew, sc.getStateAtHeight(initHeight),
|
||||
"Expected block %d to be blockStateNew", initHeight)
|
||||
|
||||
assert.NoErrorf(t, sc.markPending(peerIDTwo, initHeight, now),
|
||||
"Expected markingPending %d from %s to succeed", initHeight, peerIDTwo)
|
||||
assert.NoErrorf(t, sc.markReceived(peerIDTwo, initHeight, blockSize, receivedAt),
|
||||
"Expected marking markReceived %d from %s to succeed", initHeight, peerIDTwo)
|
||||
assert.Equalf(t, blockStateReceived, sc.getStateAtHeight(initHeight),
|
||||
"Expected block %d to be blockStateReceived", initHeight)
|
||||
}
|
||||
|
||||
func TestTransitionProcessed(t *testing.T) {
|
||||
var (
|
||||
initHeight int64 = 5
|
||||
peerID p2p.ID = "1"
|
||||
peerHeight int64 = 20
|
||||
blockSize int64 = 1024
|
||||
sc = newSchedule(initHeight)
|
||||
now = time.Now()
|
||||
receivedAt = now.Add(1 * time.Second)
|
||||
)
|
||||
|
||||
assert.NoError(t, sc.addPeer(peerID),
|
||||
"Expected adding peer %s to succeed", peerID)
|
||||
assert.NoErrorf(t, sc.setPeerHeight(peerID, peerHeight),
|
||||
"Expected setPeerHeight on %s to %d to succeed", peerID, peerHeight)
|
||||
assert.NoError(t, sc.markPending(peerID, initHeight, now),
|
||||
"Expected markingPending new block to succeed")
|
||||
assert.NoError(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt),
|
||||
"Expected marking markReceived on a pending block to succeed")
|
||||
|
||||
assert.Error(t, sc.markProcessed(initHeight+1),
|
||||
"Expected marking %d as processed to fail", initHeight+1)
|
||||
assert.NoError(t, sc.markProcessed(initHeight),
|
||||
"Expected marking %d as processed to succeed", initHeight)
|
||||
|
||||
assert.Equalf(t, blockStateProcessed, sc.getStateAtHeight(initHeight),
|
||||
"Expected block %d to be blockStateProcessed", initHeight)
|
||||
|
||||
assert.NoError(t, sc.removePeer(peerID),
|
||||
"Expected removing peer %s to succeed", peerID)
|
||||
|
||||
assert.Equalf(t, blockStateProcessed, sc.getStateAtHeight(initHeight),
|
||||
"Expected block %d to be blockStateProcessed", initHeight)
|
||||
}
|
||||
|
||||
func TestMinMaxHeight(t *testing.T) {
|
||||
var (
|
||||
initHeight int64 = 5
|
||||
peerID p2p.ID = "1"
|
||||
peerHeight int64 = 20
|
||||
sc = newSchedule(initHeight)
|
||||
now = time.Now()
|
||||
)
|
||||
|
||||
assert.Equal(t, initHeight, sc.minHeight(),
|
||||
"Expected min height to be the initialized height")
|
||||
|
||||
assert.Equal(t, initHeight, sc.maxHeight(),
|
||||
"Expected max height to be the initialized height")
|
||||
|
||||
assert.NoError(t, sc.addPeer(peerID),
|
||||
"Adding a peer should return no error")
|
||||
|
||||
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight),
|
||||
"Expected setPeerHeight to return no error")
|
||||
|
||||
assert.Equal(t, peerHeight, sc.maxHeight(),
|
||||
"Expected max height to increase to peerHeight")
|
||||
|
||||
assert.Nil(t, sc.markPending(peerID, initHeight, now.Add(1*time.Second)),
|
||||
"Expected marking initHeight as pending to return no error")
|
||||
|
||||
assert.Equal(t, initHeight+1, sc.minHeight(),
|
||||
"Expected marking initHeight as pending to move minHeight forward")
|
||||
}
|
||||
|
||||
func TestPeersSlowerThan(t *testing.T) {
|
||||
var (
|
||||
initHeight int64 = 5
|
||||
peerID p2p.ID = "1"
|
||||
peerHeight int64 = 20
|
||||
blockSize int64 = 1024
|
||||
sc = newSchedule(initHeight)
|
||||
now = time.Now()
|
||||
receivedAt = now.Add(1 * time.Second)
|
||||
)
|
||||
|
||||
assert.NoError(t, sc.addPeer(peerID),
|
||||
"Adding a peer should return no error")
|
||||
|
||||
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight),
|
||||
"Expected setPeerHeight to return no error")
|
||||
|
||||
assert.NoError(t, sc.markPending(peerID, peerHeight, now),
|
||||
"Expected markingPending on to return no error")
|
||||
|
||||
assert.NoError(t, sc.markReceived(peerID, peerHeight, blockSize, receivedAt),
|
||||
"Expected markingPending on to return no error")
|
||||
|
||||
assert.Empty(t, sc.peersSlowerThan(blockSize-1),
|
||||
"expected no peers to be slower than blockSize-1 bytes/sec")
|
||||
|
||||
assert.Containsf(t, sc.peersSlowerThan(blockSize+1), peerID,
|
||||
"expected %s to be slower than blockSize+1 bytes/sec", peerID)
|
||||
}
|
@@ -48,15 +48,17 @@ func main() {
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
rs := privval.NewSignerServiceEndpoint(logger, *chainID, pv, dialer)
|
||||
err := rs.Start()
|
||||
sd := privval.NewSignerDialerEndpoint(logger, dialer)
|
||||
ss := privval.NewSignerServer(sd, *chainID, pv)
|
||||
|
||||
err := ss.Start()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Stop upon receiving SIGTERM or CTRL-C.
|
||||
cmn.TrapSignal(logger, func() {
|
||||
err := rs.Stop()
|
||||
err := ss.Stop()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@@ -294,6 +294,9 @@ max_txs_bytes = {{ .Mempool.MaxTxsBytes }}
|
||||
# Size of the cache (used to filter transactions we saw earlier) in transactions
|
||||
cache_size = {{ .Mempool.CacheSize }}
|
||||
|
||||
# Limit the size of TxMessage
|
||||
max_msg_bytes = {{ .Mempool.MaxMsgBytes }}
|
||||
|
||||
##### fast sync configuration options #####
|
||||
[fastsync]
|
||||
|
||||
@@ -302,9 +305,6 @@ cache_size = {{ .Mempool.CacheSize }}
|
||||
# 2) "v1" - refactor of v0 version for better testability
|
||||
version = "{{ .FastSync.Version }}"
|
||||
|
||||
# Limit the size of TxMessage
|
||||
max_msg_bytes = {{ .Mempool.MaxMsgBytes }}
|
||||
|
||||
##### consensus configuration options #####
|
||||
[consensus]
|
||||
|
||||
|
@@ -1518,9 +1518,11 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) (bool, err
|
||||
cs.evpool.AddEvidence(voteErr.DuplicateVoteEvidence)
|
||||
return added, err
|
||||
} else {
|
||||
// Probably an invalid signature / Bad peer.
|
||||
// Seems this can also err sometimes with "Unexpected step" - perhaps not from a bad peer ?
|
||||
cs.Logger.Error("Error attempting to add vote", "err", err)
|
||||
// Either
|
||||
// 1) bad peer OR
|
||||
// 2) not a bad peer? this can also err sometimes with "Unexpected step" OR
|
||||
// 3) tmkms use with multiple validators connecting to a single tmkms instance (https://github.com/tendermint/tendermint/issues/3839).
|
||||
cs.Logger.Info("Error attempting to add vote", "err", err)
|
||||
return added, ErrAddingVote
|
||||
}
|
||||
}
|
||||
|
@@ -240,6 +240,9 @@ max_txs_bytes = 1073741824
|
||||
# Size of the cache (used to filter transactions we saw earlier) in transactions
|
||||
cache_size = 10000
|
||||
|
||||
# Limit the size of TxMessage
|
||||
max_msg_bytes = 1048576
|
||||
|
||||
##### fast sync configuration options #####
|
||||
[fastsync]
|
||||
|
||||
@@ -248,9 +251,6 @@ cache_size = 10000
|
||||
# 2) "v1" - refactor of v0 version for better testability
|
||||
version = "v0"
|
||||
|
||||
# Limit the size of TxMessage
|
||||
max_msg_bytes = 1048576
|
||||
|
||||
##### consensus configuration options #####
|
||||
[consensus]
|
||||
|
||||
|
39
node/node.go
39
node/node.go
@@ -23,7 +23,7 @@ import (
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/consensus"
|
||||
cs "github.com/tendermint/tendermint/consensus"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/evidence"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
@@ -278,9 +278,7 @@ func doHandshake(stateDB dbm.DB, state sm.State, blockStore sm.BlockStore,
|
||||
return nil
|
||||
}
|
||||
|
||||
func logNodeStartupInfo(state sm.State, privValidator types.PrivValidator, logger,
|
||||
consensusLogger log.Logger) {
|
||||
|
||||
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) {
|
||||
// Log the version info.
|
||||
logger.Info("Version info",
|
||||
"software", version.TMCoreSemVer,
|
||||
@@ -296,7 +294,6 @@ func logNodeStartupInfo(state sm.State, privValidator types.PrivValidator, logge
|
||||
)
|
||||
}
|
||||
|
||||
pubKey := privValidator.GetPubKey()
|
||||
addr := pubKey.Address()
|
||||
// Log whether this node is a validator or an observer
|
||||
if state.Validators.HasAddress(addr) {
|
||||
@@ -601,7 +598,13 @@ func NewNode(config *cfg.Config,
|
||||
}
|
||||
}
|
||||
|
||||
logNodeStartupInfo(state, privValidator, logger, consensusLogger)
|
||||
pubKey := privValidator.GetPubKey()
|
||||
if pubKey == nil {
|
||||
// TODO: GetPubKey should return errors - https://github.com/tendermint/tendermint/issues/3602
|
||||
return nil, errors.New("could not retrieve public key from private validator")
|
||||
}
|
||||
|
||||
logNodeStartupInfo(state, pubKey, logger, consensusLogger)
|
||||
|
||||
// Decide whether to fast-sync or not
|
||||
// We don't fast-sync when the only validator is us.
|
||||
@@ -1158,29 +1161,13 @@ func createAndStartPrivValidatorSocketClient(
|
||||
listenAddr string,
|
||||
logger log.Logger,
|
||||
) (types.PrivValidator, error) {
|
||||
var listener net.Listener
|
||||
|
||||
protocol, address := cmn.ProtocolAndAddress(listenAddr)
|
||||
ln, err := net.Listen(protocol, address)
|
||||
pve, err := privval.NewSignerListener(listenAddr, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch protocol {
|
||||
case "unix":
|
||||
listener = privval.NewUnixListener(ln)
|
||||
case "tcp":
|
||||
// TODO: persist this key so external signer
|
||||
// can actually authenticate us
|
||||
listener = privval.NewTCPListener(ln, ed25519.GenPrivKey())
|
||||
default:
|
||||
return nil, fmt.Errorf(
|
||||
"wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
|
||||
protocol,
|
||||
)
|
||||
return nil, errors.Wrap(err, "failed to start private validator")
|
||||
}
|
||||
|
||||
pvsc := privval.NewSignerValidatorEndpoint(logger.With("module", "privval"), listener)
|
||||
if err := pvsc.Start(); err != nil {
|
||||
pvsc, err := privval.NewSignerClient(pve)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to start private validator")
|
||||
}
|
||||
|
||||
|
@@ -136,25 +136,29 @@ func TestNodeSetPrivValTCP(t *testing.T) {
|
||||
config.BaseConfig.PrivValidatorListenAddr = addr
|
||||
|
||||
dialer := privval.DialTCPFn(addr, 100*time.Millisecond, ed25519.GenPrivKey())
|
||||
pvsc := privval.NewSignerServiceEndpoint(
|
||||
dialerEndpoint := privval.NewSignerDialerEndpoint(
|
||||
log.TestingLogger(),
|
||||
config.ChainID(),
|
||||
types.NewMockPV(),
|
||||
dialer,
|
||||
)
|
||||
privval.SignerServiceEndpointTimeoutReadWrite(100 * time.Millisecond)(pvsc)
|
||||
privval.SignerDialerEndpointTimeoutReadWrite(100 * time.Millisecond)(dialerEndpoint)
|
||||
|
||||
signerServer := privval.NewSignerServer(
|
||||
dialerEndpoint,
|
||||
config.ChainID(),
|
||||
types.NewMockPV(),
|
||||
)
|
||||
|
||||
go func() {
|
||||
err := pvsc.Start()
|
||||
err := signerServer.Start()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
defer pvsc.Stop()
|
||||
defer signerServer.Stop()
|
||||
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
assert.IsType(t, &privval.SignerValidatorEndpoint{}, n.PrivValidator())
|
||||
assert.IsType(t, &privval.SignerClient{}, n.PrivValidator())
|
||||
}
|
||||
|
||||
// address without a protocol must result in error
|
||||
@@ -178,13 +182,17 @@ func TestNodeSetPrivValIPC(t *testing.T) {
|
||||
config.BaseConfig.PrivValidatorListenAddr = "unix://" + tmpfile
|
||||
|
||||
dialer := privval.DialUnixFn(tmpfile)
|
||||
pvsc := privval.NewSignerServiceEndpoint(
|
||||
dialerEndpoint := privval.NewSignerDialerEndpoint(
|
||||
log.TestingLogger(),
|
||||
config.ChainID(),
|
||||
types.NewMockPV(),
|
||||
dialer,
|
||||
)
|
||||
privval.SignerServiceEndpointTimeoutReadWrite(100 * time.Millisecond)(pvsc)
|
||||
privval.SignerDialerEndpointTimeoutReadWrite(100 * time.Millisecond)(dialerEndpoint)
|
||||
|
||||
pvsc := privval.NewSignerServer(
|
||||
dialerEndpoint,
|
||||
config.ChainID(),
|
||||
types.NewMockPV(),
|
||||
)
|
||||
|
||||
go func() {
|
||||
err := pvsc.Start()
|
||||
@@ -194,8 +202,7 @@ func TestNodeSetPrivValIPC(t *testing.T) {
|
||||
|
||||
n, err := DefaultNewNode(config, log.TestingLogger())
|
||||
require.NoError(t, err)
|
||||
assert.IsType(t, &privval.SignerValidatorEndpoint{}, n.PrivValidator())
|
||||
|
||||
assert.IsType(t, &privval.SignerClient{}, n.PrivValidator())
|
||||
}
|
||||
|
||||
// testFreeAddr claims a free port so we don't block on listener being ready.
|
||||
|
@@ -6,16 +6,16 @@ FilePV
|
||||
|
||||
FilePV is the simplest implementation and developer default. It uses one file for the private key and another to store state.
|
||||
|
||||
SignerValidatorEndpoint
|
||||
SignerListenerEndpoint
|
||||
|
||||
SignerValidatorEndpoint establishes a connection to an external process, like a Key Management Server (KMS), using a socket.
|
||||
SignerValidatorEndpoint listens for the external KMS process to dial in.
|
||||
SignerValidatorEndpoint takes a listener, which determines the type of connection
|
||||
SignerListenerEndpoint establishes a connection to an external process, like a Key Management Server (KMS), using a socket.
|
||||
SignerListenerEndpoint listens for the external KMS process to dial in.
|
||||
SignerListenerEndpoint takes a listener, which determines the type of connection
|
||||
(ie. encrypted over tcp, or unencrypted over unix).
|
||||
|
||||
SignerServiceEndpoint
|
||||
SignerDialerEndpoint
|
||||
|
||||
SignerServiceEndpoint is a simple wrapper around a net.Conn. It's used by both IPCVal and TCPVal.
|
||||
SignerDialerEndpoint is a simple wrapper around a net.Conn. It's used by both IPCVal and TCPVal.
|
||||
|
||||
*/
|
||||
package privval
|
||||
|
@@ -4,10 +4,21 @@ import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type EndpointTimeoutError struct{}
|
||||
|
||||
// Implement the net.Error interface.
|
||||
func (e EndpointTimeoutError) Error() string { return "endpoint connection timed out" }
|
||||
func (e EndpointTimeoutError) Timeout() bool { return true }
|
||||
func (e EndpointTimeoutError) Temporary() bool { return true }
|
||||
|
||||
// Socket errors.
|
||||
var (
|
||||
ErrUnexpectedResponse = fmt.Errorf("received unexpected response")
|
||||
ErrConnTimeout = fmt.Errorf("remote signer timed out")
|
||||
ErrNoConnection = fmt.Errorf("endpoint is not connected")
|
||||
ErrConnectionTimeout = EndpointTimeoutError{}
|
||||
|
||||
ErrReadTimeout = fmt.Errorf("endpoint read timed out")
|
||||
ErrWriteTimeout = fmt.Errorf("endpoint write timed out")
|
||||
)
|
||||
|
||||
// RemoteSignerError allows (remote) validators to include meaningful error descriptions in their reply.
|
||||
@@ -18,5 +29,5 @@ type RemoteSignerError struct {
|
||||
}
|
||||
|
||||
func (e *RemoteSignerError) Error() string {
|
||||
return fmt.Sprintf("signerServiceEndpoint returned error #%d: %s", e.Code, e.Description)
|
||||
return fmt.Sprintf("signerEndpoint returned error #%d: %s", e.Code, e.Description)
|
||||
}
|
||||
|
@@ -67,11 +67,11 @@ func assertEqualPV(t *testing.T, oldPV *privval.OldFilePV, newPV *privval.FilePV
|
||||
}
|
||||
|
||||
func initTmpOldFile(t *testing.T) string {
|
||||
tmpfile, err := ioutil.TempFile("", "priv_validator_*.json")
|
||||
tmpFile, err := ioutil.TempFile("", "priv_validator_*.json")
|
||||
require.NoError(t, err)
|
||||
t.Logf("created test file %s", tmpfile.Name())
|
||||
_, err = tmpfile.WriteString(oldPrivvalContent)
|
||||
t.Logf("created test file %s", tmpFile.Name())
|
||||
_, err = tmpFile.WriteString(oldPrivvalContent)
|
||||
require.NoError(t, err)
|
||||
|
||||
return tmpfile.Name()
|
||||
return tmpFile.Name()
|
||||
}
|
||||
|
@@ -58,7 +58,7 @@ func TestResetValidator(t *testing.T) {
|
||||
// priv val after signing is not same as empty
|
||||
assert.NotEqual(t, privVal.LastSignState, emptyState)
|
||||
|
||||
// priv val after reset is same as empty
|
||||
// priv val after AcceptNewConnection is same as empty
|
||||
privVal.Reset()
|
||||
assert.Equal(t, privVal.LastSignState, emptyState)
|
||||
}
|
||||
@@ -164,6 +164,7 @@ func TestSignVote(t *testing.T) {
|
||||
|
||||
block1 := types.BlockID{Hash: []byte{1, 2, 3}, PartsHeader: types.PartSetHeader{}}
|
||||
block2 := types.BlockID{Hash: []byte{3, 2, 1}, PartsHeader: types.PartSetHeader{}}
|
||||
|
||||
height, round := int64(10), 1
|
||||
voteType := byte(types.PrevoteType)
|
||||
|
||||
|
@@ -1,61 +1,64 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
amino "github.com/tendermint/go-amino"
|
||||
"github.com/tendermint/go-amino"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// RemoteSignerMsg is sent between SignerServiceEndpoint and the SignerServiceEndpoint client.
|
||||
type RemoteSignerMsg interface{}
|
||||
// SignerMessage is sent between Signer Clients and Servers.
|
||||
type SignerMessage interface{}
|
||||
|
||||
func RegisterRemoteSignerMsg(cdc *amino.Codec) {
|
||||
cdc.RegisterInterface((*RemoteSignerMsg)(nil), nil)
|
||||
cdc.RegisterInterface((*SignerMessage)(nil), nil)
|
||||
cdc.RegisterConcrete(&PubKeyRequest{}, "tendermint/remotesigner/PubKeyRequest", nil)
|
||||
cdc.RegisterConcrete(&PubKeyResponse{}, "tendermint/remotesigner/PubKeyResponse", nil)
|
||||
cdc.RegisterConcrete(&SignVoteRequest{}, "tendermint/remotesigner/SignVoteRequest", nil)
|
||||
cdc.RegisterConcrete(&SignedVoteResponse{}, "tendermint/remotesigner/SignedVoteResponse", nil)
|
||||
cdc.RegisterConcrete(&SignProposalRequest{}, "tendermint/remotesigner/SignProposalRequest", nil)
|
||||
cdc.RegisterConcrete(&SignedProposalResponse{}, "tendermint/remotesigner/SignedProposalResponse", nil)
|
||||
|
||||
cdc.RegisterConcrete(&PingRequest{}, "tendermint/remotesigner/PingRequest", nil)
|
||||
cdc.RegisterConcrete(&PingResponse{}, "tendermint/remotesigner/PingResponse", nil)
|
||||
}
|
||||
|
||||
// TODO: Add ChainIDRequest
|
||||
|
||||
// PubKeyRequest requests the consensus public key from the remote signer.
|
||||
type PubKeyRequest struct{}
|
||||
|
||||
// PubKeyResponse is a PrivValidatorSocket message containing the public key.
|
||||
// PubKeyResponse is a response message containing the public key.
|
||||
type PubKeyResponse struct {
|
||||
PubKey crypto.PubKey
|
||||
Error *RemoteSignerError
|
||||
}
|
||||
|
||||
// SignVoteRequest is a PrivValidatorSocket message containing a vote.
|
||||
// SignVoteRequest is a request to sign a vote
|
||||
type SignVoteRequest struct {
|
||||
Vote *types.Vote
|
||||
}
|
||||
|
||||
// SignedVoteResponse is a PrivValidatorSocket message containing a signed vote along with a potenial error message.
|
||||
// SignedVoteResponse is a response containing a signed vote or an error
|
||||
type SignedVoteResponse struct {
|
||||
Vote *types.Vote
|
||||
Error *RemoteSignerError
|
||||
}
|
||||
|
||||
// SignProposalRequest is a PrivValidatorSocket message containing a Proposal.
|
||||
// SignProposalRequest is a request to sign a proposal
|
||||
type SignProposalRequest struct {
|
||||
Proposal *types.Proposal
|
||||
}
|
||||
|
||||
// SignedProposalResponse is a PrivValidatorSocket message containing a proposal response
|
||||
// SignedProposalResponse is response containing a signed proposal or an error
|
||||
type SignedProposalResponse struct {
|
||||
Proposal *types.Proposal
|
||||
Error *RemoteSignerError
|
||||
}
|
||||
|
||||
// PingRequest is a PrivValidatorSocket message to keep the connection alive.
|
||||
// PingRequest is a request to confirm that the connection is alive.
|
||||
type PingRequest struct {
|
||||
}
|
||||
|
||||
// PingRequest is a PrivValidatorSocket response to keep the connection alive.
|
||||
// PingResponse is a response to confirm that the connection is alive.
|
||||
type PingResponse struct {
|
||||
}
|
||||
|
131
privval/signer_client.go
Normal file
131
privval/signer_client.go
Normal file
@@ -0,0 +1,131 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// SignerClient implements PrivValidator.
|
||||
// Handles remote validator connections that provide signing services
|
||||
type SignerClient struct {
|
||||
endpoint *SignerListenerEndpoint
|
||||
}
|
||||
|
||||
var _ types.PrivValidator = (*SignerClient)(nil)
|
||||
|
||||
// NewSignerClient returns an instance of SignerClient.
|
||||
// it will start the endpoint (if not already started)
|
||||
func NewSignerClient(endpoint *SignerListenerEndpoint) (*SignerClient, error) {
|
||||
if !endpoint.IsRunning() {
|
||||
if err := endpoint.Start(); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to start listener endpoint")
|
||||
}
|
||||
}
|
||||
|
||||
return &SignerClient{endpoint: endpoint}, nil
|
||||
}
|
||||
|
||||
// Close closes the underlying connection
|
||||
func (sc *SignerClient) Close() error {
|
||||
return sc.endpoint.Close()
|
||||
}
|
||||
|
||||
// IsConnected indicates with the signer is connected to a remote signing service
|
||||
func (sc *SignerClient) IsConnected() bool {
|
||||
return sc.endpoint.IsConnected()
|
||||
}
|
||||
|
||||
// WaitForConnection waits maxWait for a connection or returns a timeout error
|
||||
func (sc *SignerClient) WaitForConnection(maxWait time.Duration) error {
|
||||
return sc.endpoint.WaitForConnection(maxWait)
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// Implement PrivValidator
|
||||
|
||||
// Ping sends a ping request to the remote signer
|
||||
func (sc *SignerClient) Ping() error {
|
||||
response, err := sc.endpoint.SendRequest(&PingRequest{})
|
||||
|
||||
if err != nil {
|
||||
sc.endpoint.Logger.Error("SignerClient::Ping", "err", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
_, ok := response.(*PingResponse)
|
||||
if !ok {
|
||||
sc.endpoint.Logger.Error("SignerClient::Ping", "err", "response != PingResponse")
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPubKey retrieves a public key from a remote signer
|
||||
func (sc *SignerClient) GetPubKey() crypto.PubKey {
|
||||
response, err := sc.endpoint.SendRequest(&PubKeyRequest{})
|
||||
if err != nil {
|
||||
sc.endpoint.Logger.Error("SignerClient::GetPubKey", "err", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
pubKeyResp, ok := response.(*PubKeyResponse)
|
||||
if !ok {
|
||||
sc.endpoint.Logger.Error("SignerClient::GetPubKey", "err", "response != PubKeyResponse")
|
||||
return nil
|
||||
}
|
||||
|
||||
if pubKeyResp.Error != nil {
|
||||
sc.endpoint.Logger.Error("failed to get private validator's public key", "err", pubKeyResp.Error)
|
||||
return nil
|
||||
}
|
||||
|
||||
return pubKeyResp.PubKey
|
||||
}
|
||||
|
||||
// SignVote requests a remote signer to sign a vote
|
||||
func (sc *SignerClient) SignVote(chainID string, vote *types.Vote) error {
|
||||
response, err := sc.endpoint.SendRequest(&SignVoteRequest{Vote: vote})
|
||||
if err != nil {
|
||||
sc.endpoint.Logger.Error("SignerClient::SignVote", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
resp, ok := response.(*SignedVoteResponse)
|
||||
if !ok {
|
||||
sc.endpoint.Logger.Error("SignerClient::GetPubKey", "err", "response != SignedVoteResponse")
|
||||
return ErrUnexpectedResponse
|
||||
}
|
||||
|
||||
if resp.Error != nil {
|
||||
return resp.Error
|
||||
}
|
||||
*vote = *resp.Vote
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SignProposal requests a remote signer to sign a proposal
|
||||
func (sc *SignerClient) SignProposal(chainID string, proposal *types.Proposal) error {
|
||||
response, err := sc.endpoint.SendRequest(&SignProposalRequest{Proposal: proposal})
|
||||
if err != nil {
|
||||
sc.endpoint.Logger.Error("SignerClient::SignProposal", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
resp, ok := response.(*SignedProposalResponse)
|
||||
if !ok {
|
||||
sc.endpoint.Logger.Error("SignerClient::SignProposal", "err", "response != SignedProposalResponse")
|
||||
return ErrUnexpectedResponse
|
||||
}
|
||||
if resp.Error != nil {
|
||||
return resp.Error
|
||||
}
|
||||
*proposal = *resp.Proposal
|
||||
|
||||
return nil
|
||||
}
|
257
privval/signer_client_test.go
Normal file
257
privval/signer_client_test.go
Normal file
@@ -0,0 +1,257 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
type signerTestCase struct {
|
||||
chainID string
|
||||
mockPV types.PrivValidator
|
||||
signerClient *SignerClient
|
||||
signerServer *SignerServer
|
||||
}
|
||||
|
||||
func getSignerTestCases(t *testing.T) []signerTestCase {
|
||||
testCases := make([]signerTestCase, 0)
|
||||
|
||||
// Get test cases for each possible dialer (DialTCP / DialUnix / etc)
|
||||
for _, dtc := range getDialerTestCases(t) {
|
||||
chainID := common.RandStr(12)
|
||||
mockPV := types.NewMockPV()
|
||||
|
||||
// get a pair of signer listener, signer dialer endpoints
|
||||
sl, sd := getMockEndpoints(t, dtc.addr, dtc.dialer)
|
||||
sc, err := NewSignerClient(sl)
|
||||
require.NoError(t, err)
|
||||
ss := NewSignerServer(sd, chainID, mockPV)
|
||||
|
||||
err = ss.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
tc := signerTestCase{
|
||||
chainID: chainID,
|
||||
mockPV: mockPV,
|
||||
signerClient: sc,
|
||||
signerServer: ss,
|
||||
}
|
||||
|
||||
testCases = append(testCases, tc)
|
||||
}
|
||||
|
||||
return testCases
|
||||
}
|
||||
|
||||
func TestSignerClose(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
err := tc.signerClient.Close()
|
||||
assert.NoError(t, err)
|
||||
|
||||
err = tc.signerServer.Stop()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerPing(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
err := tc.signerClient.Ping()
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerGetPubKey(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
pubKey := tc.signerClient.GetPubKey()
|
||||
expectedPubKey := tc.mockPV.GetPubKey()
|
||||
|
||||
assert.Equal(t, expectedPubKey, pubKey)
|
||||
|
||||
addr := tc.signerClient.GetPubKey().Address()
|
||||
expectedAddr := tc.mockPV.GetPubKey().Address()
|
||||
|
||||
assert.Equal(t, expectedAddr, addr)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerProposal(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
ts := time.Now()
|
||||
want := &types.Proposal{Timestamp: ts}
|
||||
have := &types.Proposal{Timestamp: ts}
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
require.NoError(t, tc.mockPV.SignProposal(tc.chainID, want))
|
||||
require.NoError(t, tc.signerClient.SignProposal(tc.chainID, have))
|
||||
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerVote(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
ts := time.Now()
|
||||
want := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
|
||||
have := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
require.NoError(t, tc.mockPV.SignVote(tc.chainID, want))
|
||||
require.NoError(t, tc.signerClient.SignVote(tc.chainID, have))
|
||||
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerVoteResetDeadline(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
ts := time.Now()
|
||||
want := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
|
||||
have := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
time.Sleep(testTimeoutReadWrite2o3)
|
||||
|
||||
require.NoError(t, tc.mockPV.SignVote(tc.chainID, want))
|
||||
require.NoError(t, tc.signerClient.SignVote(tc.chainID, have))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
|
||||
// TODO(jleni): Clarify what is actually being tested
|
||||
|
||||
// This would exceed the deadline if it was not extended by the previous message
|
||||
time.Sleep(testTimeoutReadWrite2o3)
|
||||
|
||||
require.NoError(t, tc.mockPV.SignVote(tc.chainID, want))
|
||||
require.NoError(t, tc.signerClient.SignVote(tc.chainID, have))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerVoteKeepAlive(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
ts := time.Now()
|
||||
want := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
|
||||
have := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
// Check that even if the client does not request a
|
||||
// signature for a long time. The service is still available
|
||||
|
||||
// in this particular case, we use the dialer logger to ensure that
|
||||
// test messages are properly interleaved in the test logs
|
||||
tc.signerServer.Logger.Debug("TEST: Forced Wait -------------------------------------------------")
|
||||
time.Sleep(testTimeoutReadWrite * 3)
|
||||
tc.signerServer.Logger.Debug("TEST: Forced Wait DONE---------------------------------------------")
|
||||
|
||||
require.NoError(t, tc.mockPV.SignVote(tc.chainID, want))
|
||||
require.NoError(t, tc.signerClient.SignVote(tc.chainID, have))
|
||||
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerSignProposalErrors(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
// Replace service with a mock that always fails
|
||||
tc.signerServer.privVal = types.NewErroringMockPV()
|
||||
tc.mockPV = types.NewErroringMockPV()
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
ts := time.Now()
|
||||
proposal := &types.Proposal{Timestamp: ts}
|
||||
err := tc.signerClient.SignProposal(tc.chainID, proposal)
|
||||
require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
|
||||
|
||||
err = tc.mockPV.SignProposal(tc.chainID, proposal)
|
||||
require.Error(t, err)
|
||||
|
||||
err = tc.signerClient.SignProposal(tc.chainID, proposal)
|
||||
require.Error(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSignerSignVoteErrors(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
ts := time.Now()
|
||||
vote := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
|
||||
|
||||
// Replace signer service privval with one that always fails
|
||||
tc.signerServer.privVal = types.NewErroringMockPV()
|
||||
tc.mockPV = types.NewErroringMockPV()
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
err := tc.signerClient.SignVote(tc.chainID, vote)
|
||||
require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
|
||||
|
||||
err = tc.mockPV.SignVote(tc.chainID, vote)
|
||||
require.Error(t, err)
|
||||
|
||||
err = tc.signerClient.SignVote(tc.chainID, vote)
|
||||
require.Error(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func brokenHandler(privVal types.PrivValidator, request SignerMessage, chainID string) (SignerMessage, error) {
|
||||
var res SignerMessage
|
||||
var err error
|
||||
|
||||
switch r := request.(type) {
|
||||
|
||||
// This is broken and will answer most requests with a pubkey response
|
||||
case *PubKeyRequest:
|
||||
res = &PubKeyResponse{nil, nil}
|
||||
case *SignVoteRequest:
|
||||
res = &PubKeyResponse{nil, nil}
|
||||
case *SignProposalRequest:
|
||||
res = &PubKeyResponse{nil, nil}
|
||||
|
||||
case *PingRequest:
|
||||
err, res = nil, &PingResponse{}
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown msg: %v", r)
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
||||
|
||||
func TestSignerUnexpectedResponse(t *testing.T) {
|
||||
for _, tc := range getSignerTestCases(t) {
|
||||
tc.signerServer.privVal = types.NewMockPV()
|
||||
tc.mockPV = types.NewMockPV()
|
||||
|
||||
tc.signerServer.SetRequestHandler(brokenHandler)
|
||||
|
||||
defer tc.signerServer.Stop()
|
||||
defer tc.signerClient.Close()
|
||||
|
||||
ts := time.Now()
|
||||
want := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
|
||||
|
||||
e := tc.signerClient.SignVote(tc.chainID, want)
|
||||
assert.EqualError(t, e, "received unexpected response")
|
||||
}
|
||||
}
|
84
privval/signer_dialer_endpoint.go
Normal file
84
privval/signer_dialer_endpoint.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultMaxDialRetries = 10
|
||||
defaultRetryWaitMilliseconds = 100
|
||||
)
|
||||
|
||||
// SignerServiceEndpointOption sets an optional parameter on the SignerDialerEndpoint.
|
||||
type SignerServiceEndpointOption func(*SignerDialerEndpoint)
|
||||
|
||||
// SignerDialerEndpointTimeoutReadWrite sets the read and write timeout for connections
|
||||
// from external signing processes.
|
||||
func SignerDialerEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceEndpointOption {
|
||||
return func(ss *SignerDialerEndpoint) { ss.timeoutReadWrite = timeout }
|
||||
}
|
||||
|
||||
// SignerDialerEndpointConnRetries sets the amount of attempted retries to acceptNewConnection.
|
||||
func SignerDialerEndpointConnRetries(retries int) SignerServiceEndpointOption {
|
||||
return func(ss *SignerDialerEndpoint) { ss.maxConnRetries = retries }
|
||||
}
|
||||
|
||||
// SignerDialerEndpoint dials using its dialer and responds to any
|
||||
// signature requests using its privVal.
|
||||
type SignerDialerEndpoint struct {
|
||||
signerEndpoint
|
||||
|
||||
dialer SocketDialer
|
||||
|
||||
retryWait time.Duration
|
||||
maxConnRetries int
|
||||
}
|
||||
|
||||
// NewSignerDialerEndpoint returns a SignerDialerEndpoint that will dial using the given
|
||||
// dialer and respond to any signature requests over the connection
|
||||
// using the given privVal.
|
||||
func NewSignerDialerEndpoint(
|
||||
logger log.Logger,
|
||||
dialer SocketDialer,
|
||||
) *SignerDialerEndpoint {
|
||||
|
||||
sd := &SignerDialerEndpoint{
|
||||
dialer: dialer,
|
||||
retryWait: defaultRetryWaitMilliseconds * time.Millisecond,
|
||||
maxConnRetries: defaultMaxDialRetries,
|
||||
}
|
||||
|
||||
sd.BaseService = *cmn.NewBaseService(logger, "SignerDialerEndpoint", sd)
|
||||
sd.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
|
||||
|
||||
return sd
|
||||
}
|
||||
|
||||
func (sd *SignerDialerEndpoint) ensureConnection() error {
|
||||
if sd.IsConnected() {
|
||||
return nil
|
||||
}
|
||||
|
||||
retries := 0
|
||||
for retries < sd.maxConnRetries {
|
||||
conn, err := sd.dialer()
|
||||
|
||||
if err != nil {
|
||||
retries++
|
||||
sd.Logger.Debug("SignerDialer: Reconnection failed", "retries", retries, "max", sd.maxConnRetries, "err", err)
|
||||
// Wait between retries
|
||||
time.Sleep(sd.retryWait)
|
||||
} else {
|
||||
sd.SetConnection(conn)
|
||||
sd.Logger.Debug("SignerDialer: Connection Ready")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
sd.Logger.Debug("SignerDialer: Max retries exceeded", "retries", retries, "max", sd.maxConnRetries)
|
||||
|
||||
return ErrNoConnection
|
||||
}
|
156
privval/signer_endpoint.go
Normal file
156
privval/signer_endpoint.go
Normal file
@@ -0,0 +1,156 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultTimeoutReadWriteSeconds = 3
|
||||
)
|
||||
|
||||
type signerEndpoint struct {
|
||||
cmn.BaseService
|
||||
|
||||
connMtx sync.Mutex
|
||||
conn net.Conn
|
||||
|
||||
timeoutReadWrite time.Duration
|
||||
}
|
||||
|
||||
// Close closes the underlying net.Conn.
|
||||
func (se *signerEndpoint) Close() error {
|
||||
se.DropConnection()
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsConnected indicates if there is an active connection
|
||||
func (se *signerEndpoint) IsConnected() bool {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
return se.isConnected()
|
||||
}
|
||||
|
||||
// TryGetConnection retrieves a connection if it is already available
|
||||
func (se *signerEndpoint) GetAvailableConnection(connectionAvailableCh chan net.Conn) bool {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
|
||||
// Is there a connection ready?
|
||||
select {
|
||||
case se.conn = <-connectionAvailableCh:
|
||||
return true
|
||||
default:
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// TryGetConnection retrieves a connection if it is already available
|
||||
func (se *signerEndpoint) WaitConnection(connectionAvailableCh chan net.Conn, maxWait time.Duration) error {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
|
||||
select {
|
||||
case se.conn = <-connectionAvailableCh:
|
||||
case <-time.After(maxWait):
|
||||
return ErrConnectionTimeout
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetConnection replaces the current connection object
|
||||
func (se *signerEndpoint) SetConnection(newConnection net.Conn) {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
se.conn = newConnection
|
||||
}
|
||||
|
||||
// IsConnected indicates if there is an active connection
|
||||
func (se *signerEndpoint) DropConnection() {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
se.dropConnection()
|
||||
}
|
||||
|
||||
// ReadMessage reads a message from the endpoint
|
||||
func (se *signerEndpoint) ReadMessage() (msg SignerMessage, err error) {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
|
||||
if !se.isConnected() {
|
||||
return nil, fmt.Errorf("endpoint is not connected")
|
||||
}
|
||||
|
||||
// Reset read deadline
|
||||
deadline := time.Now().Add(se.timeoutReadWrite)
|
||||
|
||||
err = se.conn.SetReadDeadline(deadline)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
const maxRemoteSignerMsgSize = 1024 * 10
|
||||
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(se.conn, &msg, maxRemoteSignerMsgSize)
|
||||
if _, ok := err.(timeoutError); ok {
|
||||
if err != nil {
|
||||
err = errors.Wrap(ErrReadTimeout, err.Error())
|
||||
} else {
|
||||
err = errors.Wrap(ErrReadTimeout, "Empty error")
|
||||
}
|
||||
se.Logger.Debug("Dropping [read]", "obj", se)
|
||||
se.dropConnection()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// WriteMessage writes a message from the endpoint
|
||||
func (se *signerEndpoint) WriteMessage(msg SignerMessage) (err error) {
|
||||
se.connMtx.Lock()
|
||||
defer se.connMtx.Unlock()
|
||||
|
||||
if !se.isConnected() {
|
||||
return errors.Wrap(ErrNoConnection, "endpoint is not connected")
|
||||
}
|
||||
|
||||
// Reset read deadline
|
||||
deadline := time.Now().Add(se.timeoutReadWrite)
|
||||
se.Logger.Debug("Write::Error Resetting deadline", "obj", se)
|
||||
|
||||
err = se.conn.SetWriteDeadline(deadline)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
_, err = cdc.MarshalBinaryLengthPrefixedWriter(se.conn, msg)
|
||||
if _, ok := err.(timeoutError); ok {
|
||||
if err != nil {
|
||||
err = errors.Wrap(ErrWriteTimeout, err.Error())
|
||||
} else {
|
||||
err = errors.Wrap(ErrWriteTimeout, "Empty error")
|
||||
}
|
||||
se.dropConnection()
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (se *signerEndpoint) isConnected() bool {
|
||||
return se.conn != nil
|
||||
}
|
||||
|
||||
func (se *signerEndpoint) dropConnection() {
|
||||
if se.conn != nil {
|
||||
if err := se.conn.Close(); err != nil {
|
||||
se.Logger.Error("signerEndpoint::dropConnection", "err", err)
|
||||
}
|
||||
se.conn = nil
|
||||
}
|
||||
}
|
198
privval/signer_listener_endpoint.go
Normal file
198
privval/signer_listener_endpoint.go
Normal file
@@ -0,0 +1,198 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
// SignerValidatorEndpointOption sets an optional parameter on the SocketVal.
|
||||
type SignerValidatorEndpointOption func(*SignerListenerEndpoint)
|
||||
|
||||
// SignerListenerEndpoint listens for an external process to dial in
|
||||
// and keeps the connection alive by dropping and reconnecting
|
||||
type SignerListenerEndpoint struct {
|
||||
signerEndpoint
|
||||
|
||||
listener net.Listener
|
||||
connectRequestCh chan struct{}
|
||||
connectionAvailableCh chan net.Conn
|
||||
|
||||
timeoutAccept time.Duration
|
||||
pingTimer *time.Ticker
|
||||
|
||||
instanceMtx sync.Mutex // Ensures instance public methods access, i.e. SendRequest
|
||||
}
|
||||
|
||||
// NewSignerListenerEndpoint returns an instance of SignerListenerEndpoint.
|
||||
func NewSignerListenerEndpoint(
|
||||
logger log.Logger,
|
||||
listener net.Listener,
|
||||
) *SignerListenerEndpoint {
|
||||
sc := &SignerListenerEndpoint{
|
||||
listener: listener,
|
||||
timeoutAccept: defaultTimeoutAcceptSeconds * time.Second,
|
||||
}
|
||||
|
||||
sc.BaseService = *cmn.NewBaseService(logger, "SignerListenerEndpoint", sc)
|
||||
sc.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
|
||||
return sc
|
||||
}
|
||||
|
||||
// OnStart implements cmn.Service.
|
||||
func (sl *SignerListenerEndpoint) OnStart() error {
|
||||
sl.connectRequestCh = make(chan struct{})
|
||||
sl.connectionAvailableCh = make(chan net.Conn)
|
||||
|
||||
sl.pingTimer = time.NewTicker(defaultPingPeriodMilliseconds * time.Millisecond)
|
||||
|
||||
go sl.serviceLoop()
|
||||
go sl.pingLoop()
|
||||
|
||||
sl.connectRequestCh <- struct{}{}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements cmn.Service
|
||||
func (sl *SignerListenerEndpoint) OnStop() {
|
||||
sl.instanceMtx.Lock()
|
||||
defer sl.instanceMtx.Unlock()
|
||||
_ = sl.Close()
|
||||
|
||||
// Stop listening
|
||||
if sl.listener != nil {
|
||||
if err := sl.listener.Close(); err != nil {
|
||||
sl.Logger.Error("Closing Listener", "err", err)
|
||||
sl.listener = nil
|
||||
}
|
||||
}
|
||||
|
||||
sl.pingTimer.Stop()
|
||||
}
|
||||
|
||||
// WaitForConnection waits maxWait for a connection or returns a timeout error
|
||||
func (sl *SignerListenerEndpoint) WaitForConnection(maxWait time.Duration) error {
|
||||
sl.instanceMtx.Lock()
|
||||
defer sl.instanceMtx.Unlock()
|
||||
return sl.ensureConnection(maxWait)
|
||||
}
|
||||
|
||||
// SendRequest ensures there is a connection, sends a request and waits for a response
|
||||
func (sl *SignerListenerEndpoint) SendRequest(request SignerMessage) (SignerMessage, error) {
|
||||
sl.instanceMtx.Lock()
|
||||
defer sl.instanceMtx.Unlock()
|
||||
|
||||
err := sl.ensureConnection(sl.timeoutAccept)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = sl.WriteMessage(request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := sl.ReadMessage()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (sl *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error {
|
||||
if sl.IsConnected() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Is there a connection ready? then use it
|
||||
if sl.GetAvailableConnection(sl.connectionAvailableCh) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// block until connected or timeout
|
||||
sl.triggerConnect()
|
||||
err := sl.WaitConnection(sl.connectionAvailableCh, maxWait)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sl *SignerListenerEndpoint) acceptNewConnection() (net.Conn, error) {
|
||||
if !sl.IsRunning() || sl.listener == nil {
|
||||
return nil, fmt.Errorf("endpoint is closing")
|
||||
}
|
||||
|
||||
// wait for a new conn
|
||||
sl.Logger.Info("SignerListener: Listening for new connection")
|
||||
conn, err := sl.listener.Accept()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (sl *SignerListenerEndpoint) triggerConnect() {
|
||||
select {
|
||||
case sl.connectRequestCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (sl *SignerListenerEndpoint) triggerReconnect() {
|
||||
sl.DropConnection()
|
||||
sl.triggerConnect()
|
||||
}
|
||||
|
||||
func (sl *SignerListenerEndpoint) serviceLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-sl.connectRequestCh:
|
||||
{
|
||||
conn, err := sl.acceptNewConnection()
|
||||
if err == nil {
|
||||
sl.Logger.Info("SignerListener: Connected")
|
||||
|
||||
// We have a good connection, wait for someone that needs one otherwise cancellation
|
||||
select {
|
||||
case sl.connectionAvailableCh <- conn:
|
||||
case <-sl.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case sl.connectRequestCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
case <-sl.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sl *SignerListenerEndpoint) pingLoop() {
|
||||
for {
|
||||
select {
|
||||
case <-sl.pingTimer.C:
|
||||
{
|
||||
_, err := sl.SendRequest(&PingRequest{})
|
||||
if err != nil {
|
||||
sl.Logger.Error("SignerListener: Ping timeout")
|
||||
sl.triggerReconnect()
|
||||
}
|
||||
}
|
||||
case <-sl.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
198
privval/signer_listener_endpoint_test.go
Normal file
198
privval/signer_listener_endpoint_test.go
Normal file
@@ -0,0 +1,198 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var (
|
||||
testTimeoutAccept = defaultTimeoutAcceptSeconds * time.Second
|
||||
|
||||
testTimeoutReadWrite = 100 * time.Millisecond
|
||||
testTimeoutReadWrite2o3 = 60 * time.Millisecond // 2/3 of the other one
|
||||
)
|
||||
|
||||
type dialerTestCase struct {
|
||||
addr string
|
||||
dialer SocketDialer
|
||||
}
|
||||
|
||||
// TestSignerRemoteRetryTCPOnly will test connection retry attempts over TCP. We
|
||||
// don't need this for Unix sockets because the OS instantly knows the state of
|
||||
// both ends of the socket connection. This basically causes the
|
||||
// SignerDialerEndpoint.dialer() call inside SignerDialerEndpoint.acceptNewConnection() to return
|
||||
// successfully immediately, putting an instant stop to any retry attempts.
|
||||
func TestSignerRemoteRetryTCPOnly(t *testing.T) {
|
||||
var (
|
||||
attemptCh = make(chan int)
|
||||
retries = 10
|
||||
)
|
||||
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
// Continuously Accept connection and close {attempts} times
|
||||
go func(ln net.Listener, attemptCh chan<- int) {
|
||||
attempts := 0
|
||||
for {
|
||||
conn, err := ln.Accept()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = conn.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
attempts++
|
||||
|
||||
if attempts == retries {
|
||||
attemptCh <- attempts
|
||||
break
|
||||
}
|
||||
}
|
||||
}(ln, attemptCh)
|
||||
|
||||
dialerEndpoint := NewSignerDialerEndpoint(
|
||||
log.TestingLogger(),
|
||||
DialTCPFn(ln.Addr().String(), testTimeoutReadWrite, ed25519.GenPrivKey()),
|
||||
)
|
||||
SignerDialerEndpointTimeoutReadWrite(time.Millisecond)(dialerEndpoint)
|
||||
SignerDialerEndpointConnRetries(retries)(dialerEndpoint)
|
||||
|
||||
chainId := cmn.RandStr(12)
|
||||
mockPV := types.NewMockPV()
|
||||
signerServer := NewSignerServer(dialerEndpoint, chainId, mockPV)
|
||||
|
||||
err = signerServer.Start()
|
||||
require.NoError(t, err)
|
||||
defer signerServer.Stop()
|
||||
|
||||
select {
|
||||
case attempts := <-attemptCh:
|
||||
assert.Equal(t, retries, attempts)
|
||||
case <-time.After(1500 * time.Millisecond):
|
||||
t.Error("expected remote to observe connection attempts")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryConnToRemoteSigner(t *testing.T) {
|
||||
for _, tc := range getDialerTestCases(t) {
|
||||
var (
|
||||
logger = log.TestingLogger()
|
||||
chainID = cmn.RandStr(12)
|
||||
mockPV = types.NewMockPV()
|
||||
endpointIsOpenCh = make(chan struct{})
|
||||
thisConnTimeout = testTimeoutReadWrite
|
||||
listenerEndpoint = newSignerListenerEndpoint(logger, tc.addr, thisConnTimeout)
|
||||
)
|
||||
|
||||
dialerEndpoint := NewSignerDialerEndpoint(
|
||||
logger,
|
||||
tc.dialer,
|
||||
)
|
||||
SignerDialerEndpointTimeoutReadWrite(testTimeoutReadWrite)(dialerEndpoint)
|
||||
SignerDialerEndpointConnRetries(10)(dialerEndpoint)
|
||||
|
||||
signerServer := NewSignerServer(dialerEndpoint, chainID, mockPV)
|
||||
|
||||
startListenerEndpointAsync(t, listenerEndpoint, endpointIsOpenCh)
|
||||
defer listenerEndpoint.Stop()
|
||||
|
||||
require.NoError(t, signerServer.Start())
|
||||
assert.True(t, signerServer.IsRunning())
|
||||
<-endpointIsOpenCh
|
||||
signerServer.Stop()
|
||||
|
||||
dialerEndpoint2 := NewSignerDialerEndpoint(
|
||||
logger,
|
||||
tc.dialer,
|
||||
)
|
||||
signerServer2 := NewSignerServer(dialerEndpoint2, chainID, mockPV)
|
||||
|
||||
// let some pings pass
|
||||
require.NoError(t, signerServer2.Start())
|
||||
assert.True(t, signerServer2.IsRunning())
|
||||
defer signerServer2.Stop()
|
||||
|
||||
// give the client some time to re-establish the conn to the remote signer
|
||||
// should see sth like this in the logs:
|
||||
//
|
||||
// E[10016-01-10|17:12:46.128] Ping err="remote signer timed out"
|
||||
// I[10016-01-10|17:16:42.447] Re-created connection to remote signer impl=SocketVal
|
||||
time.Sleep(testTimeoutReadWrite * 2)
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////
|
||||
|
||||
func newSignerListenerEndpoint(logger log.Logger, addr string, timeoutReadWrite time.Duration) *SignerListenerEndpoint {
|
||||
proto, address := cmn.ProtocolAndAddress(addr)
|
||||
|
||||
ln, err := net.Listen(proto, address)
|
||||
logger.Info("SignerListener: Listening", "proto", proto, "address", address)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var listener net.Listener
|
||||
|
||||
if proto == "unix" {
|
||||
unixLn := NewUnixListener(ln)
|
||||
UnixListenerTimeoutAccept(testTimeoutAccept)(unixLn)
|
||||
UnixListenerTimeoutReadWrite(timeoutReadWrite)(unixLn)
|
||||
listener = unixLn
|
||||
} else {
|
||||
tcpLn := NewTCPListener(ln, ed25519.GenPrivKey())
|
||||
TCPListenerTimeoutAccept(testTimeoutAccept)(tcpLn)
|
||||
TCPListenerTimeoutReadWrite(timeoutReadWrite)(tcpLn)
|
||||
listener = tcpLn
|
||||
}
|
||||
|
||||
return NewSignerListenerEndpoint(logger, listener)
|
||||
}
|
||||
|
||||
func startListenerEndpointAsync(t *testing.T, sle *SignerListenerEndpoint, endpointIsOpenCh chan struct{}) {
|
||||
go func(sle *SignerListenerEndpoint) {
|
||||
require.NoError(t, sle.Start())
|
||||
assert.True(t, sle.IsRunning())
|
||||
close(endpointIsOpenCh)
|
||||
}(sle)
|
||||
}
|
||||
|
||||
func getMockEndpoints(
|
||||
t *testing.T,
|
||||
addr string,
|
||||
socketDialer SocketDialer,
|
||||
) (*SignerListenerEndpoint, *SignerDialerEndpoint) {
|
||||
|
||||
var (
|
||||
logger = log.TestingLogger()
|
||||
endpointIsOpenCh = make(chan struct{})
|
||||
|
||||
dialerEndpoint = NewSignerDialerEndpoint(
|
||||
logger,
|
||||
socketDialer,
|
||||
)
|
||||
|
||||
listenerEndpoint = newSignerListenerEndpoint(logger, addr, testTimeoutReadWrite)
|
||||
)
|
||||
|
||||
SignerDialerEndpointTimeoutReadWrite(testTimeoutReadWrite)(dialerEndpoint)
|
||||
SignerDialerEndpointConnRetries(1e6)(dialerEndpoint)
|
||||
|
||||
startListenerEndpointAsync(t, listenerEndpoint, endpointIsOpenCh)
|
||||
|
||||
require.NoError(t, dialerEndpoint.Start())
|
||||
assert.True(t, dialerEndpoint.IsRunning())
|
||||
|
||||
<-endpointIsOpenCh
|
||||
|
||||
return listenerEndpoint, dialerEndpoint
|
||||
}
|
@@ -1,192 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// SignerRemote implements PrivValidator.
|
||||
// It uses a net.Conn to request signatures from an external process.
|
||||
type SignerRemote struct {
|
||||
conn net.Conn
|
||||
|
||||
// memoized
|
||||
consensusPubKey crypto.PubKey
|
||||
}
|
||||
|
||||
// Check that SignerRemote implements PrivValidator.
|
||||
var _ types.PrivValidator = (*SignerRemote)(nil)
|
||||
|
||||
// NewSignerRemote returns an instance of SignerRemote.
|
||||
func NewSignerRemote(conn net.Conn) (*SignerRemote, error) {
|
||||
|
||||
// retrieve and memoize the consensus public key once.
|
||||
pubKey, err := getPubKey(conn)
|
||||
if err != nil {
|
||||
return nil, cmn.ErrorWrap(err, "error while retrieving public key for remote signer")
|
||||
}
|
||||
return &SignerRemote{
|
||||
conn: conn,
|
||||
consensusPubKey: pubKey,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Close calls Close on the underlying net.Conn.
|
||||
func (sc *SignerRemote) Close() error {
|
||||
return sc.conn.Close()
|
||||
}
|
||||
|
||||
// GetPubKey implements PrivValidator.
|
||||
func (sc *SignerRemote) GetPubKey() crypto.PubKey {
|
||||
return sc.consensusPubKey
|
||||
}
|
||||
|
||||
// not thread-safe (only called on startup).
|
||||
func getPubKey(conn net.Conn) (crypto.PubKey, error) {
|
||||
err := writeMsg(conn, &PubKeyRequest{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := readMsg(conn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pubKeyResp, ok := res.(*PubKeyResponse)
|
||||
if !ok {
|
||||
return nil, errors.Wrap(ErrUnexpectedResponse, "response is not PubKeyResponse")
|
||||
}
|
||||
|
||||
if pubKeyResp.Error != nil {
|
||||
return nil, errors.Wrap(pubKeyResp.Error, "failed to get private validator's public key")
|
||||
}
|
||||
|
||||
return pubKeyResp.PubKey, nil
|
||||
}
|
||||
|
||||
// SignVote implements PrivValidator.
|
||||
func (sc *SignerRemote) SignVote(chainID string, vote *types.Vote) error {
|
||||
err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := readMsg(sc.conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, ok := res.(*SignedVoteResponse)
|
||||
if !ok {
|
||||
return ErrUnexpectedResponse
|
||||
}
|
||||
if resp.Error != nil {
|
||||
return resp.Error
|
||||
}
|
||||
*vote = *resp.Vote
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SignProposal implements PrivValidator.
|
||||
func (sc *SignerRemote) SignProposal(chainID string, proposal *types.Proposal) error {
|
||||
err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := readMsg(sc.conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
resp, ok := res.(*SignedProposalResponse)
|
||||
if !ok {
|
||||
return ErrUnexpectedResponse
|
||||
}
|
||||
if resp.Error != nil {
|
||||
return resp.Error
|
||||
}
|
||||
*proposal = *resp.Proposal
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ping is used to check connection health.
|
||||
func (sc *SignerRemote) Ping() error {
|
||||
err := writeMsg(sc.conn, &PingRequest{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
res, err := readMsg(sc.conn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, ok := res.(*PingResponse)
|
||||
if !ok {
|
||||
return ErrUnexpectedResponse
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func readMsg(r io.Reader) (msg RemoteSignerMsg, err error) {
|
||||
const maxRemoteSignerMsgSize = 1024 * 10
|
||||
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(r, &msg, maxRemoteSignerMsgSize)
|
||||
if _, ok := err.(timeoutError); ok {
|
||||
err = cmn.ErrorWrap(ErrConnTimeout, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func writeMsg(w io.Writer, msg interface{}) (err error) {
|
||||
_, err = cdc.MarshalBinaryLengthPrefixedWriter(w, msg)
|
||||
if _, ok := err.(timeoutError); ok {
|
||||
err = cmn.ErrorWrap(ErrConnTimeout, err.Error())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func handleRequest(req RemoteSignerMsg, chainID string, privVal types.PrivValidator) (RemoteSignerMsg, error) {
|
||||
var res RemoteSignerMsg
|
||||
var err error
|
||||
|
||||
switch r := req.(type) {
|
||||
case *PubKeyRequest:
|
||||
var p crypto.PubKey
|
||||
p = privVal.GetPubKey()
|
||||
res = &PubKeyResponse{p, nil}
|
||||
|
||||
case *SignVoteRequest:
|
||||
err = privVal.SignVote(chainID, r.Vote)
|
||||
if err != nil {
|
||||
res = &SignedVoteResponse{nil, &RemoteSignerError{0, err.Error()}}
|
||||
} else {
|
||||
res = &SignedVoteResponse{r.Vote, nil}
|
||||
}
|
||||
|
||||
case *SignProposalRequest:
|
||||
err = privVal.SignProposal(chainID, r.Proposal)
|
||||
if err != nil {
|
||||
res = &SignedProposalResponse{nil, &RemoteSignerError{0, err.Error()}}
|
||||
} else {
|
||||
res = &SignedProposalResponse{r.Proposal, nil}
|
||||
}
|
||||
|
||||
case *PingRequest:
|
||||
res = &PingResponse{}
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown msg: %v", r)
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
@@ -1,68 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// TestSignerRemoteRetryTCPOnly will test connection retry attempts over TCP. We
|
||||
// don't need this for Unix sockets because the OS instantly knows the state of
|
||||
// both ends of the socket connection. This basically causes the
|
||||
// SignerServiceEndpoint.dialer() call inside SignerServiceEndpoint.connect() to return
|
||||
// successfully immediately, putting an instant stop to any retry attempts.
|
||||
func TestSignerRemoteRetryTCPOnly(t *testing.T) {
|
||||
var (
|
||||
attemptCh = make(chan int)
|
||||
retries = 2
|
||||
)
|
||||
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
go func(ln net.Listener, attemptCh chan<- int) {
|
||||
attempts := 0
|
||||
|
||||
for {
|
||||
conn, err := ln.Accept()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = conn.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
attempts++
|
||||
|
||||
if attempts == retries {
|
||||
attemptCh <- attempts
|
||||
break
|
||||
}
|
||||
}
|
||||
}(ln, attemptCh)
|
||||
|
||||
serviceEndpoint := NewSignerServiceEndpoint(
|
||||
log.TestingLogger(),
|
||||
cmn.RandStr(12),
|
||||
types.NewMockPV(),
|
||||
DialTCPFn(ln.Addr().String(), testTimeoutReadWrite, ed25519.GenPrivKey()),
|
||||
)
|
||||
defer serviceEndpoint.Stop()
|
||||
|
||||
SignerServiceEndpointTimeoutReadWrite(time.Millisecond)(serviceEndpoint)
|
||||
SignerServiceEndpointConnRetries(retries)(serviceEndpoint)
|
||||
|
||||
assert.Equal(t, serviceEndpoint.Start(), ErrDialRetryMax)
|
||||
|
||||
select {
|
||||
case attempts := <-attemptCh:
|
||||
assert.Equal(t, retries, attempts)
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Error("expected remote to observe connection attempts")
|
||||
}
|
||||
}
|
44
privval/signer_requestHandler.go
Normal file
44
privval/signer_requestHandler.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func DefaultValidationRequestHandler(privVal types.PrivValidator, req SignerMessage, chainID string) (SignerMessage, error) {
|
||||
var res SignerMessage
|
||||
var err error
|
||||
|
||||
switch r := req.(type) {
|
||||
case *PubKeyRequest:
|
||||
var p crypto.PubKey
|
||||
p = privVal.GetPubKey()
|
||||
res = &PubKeyResponse{p, nil}
|
||||
|
||||
case *SignVoteRequest:
|
||||
err = privVal.SignVote(chainID, r.Vote)
|
||||
if err != nil {
|
||||
res = &SignedVoteResponse{nil, &RemoteSignerError{0, err.Error()}}
|
||||
} else {
|
||||
res = &SignedVoteResponse{r.Vote, nil}
|
||||
}
|
||||
|
||||
case *SignProposalRequest:
|
||||
err = privVal.SignProposal(chainID, r.Proposal)
|
||||
if err != nil {
|
||||
res = &SignedProposalResponse{nil, &RemoteSignerError{0, err.Error()}}
|
||||
} else {
|
||||
res = &SignedProposalResponse{r.Proposal, nil}
|
||||
}
|
||||
|
||||
case *PingRequest:
|
||||
err, res = nil, &PingResponse{}
|
||||
|
||||
default:
|
||||
err = fmt.Errorf("unknown msg: %v", r)
|
||||
}
|
||||
|
||||
return res, err
|
||||
}
|
107
privval/signer_server.go
Normal file
107
privval/signer_server.go
Normal file
@@ -0,0 +1,107 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// ValidationRequestHandlerFunc handles different remoteSigner requests
|
||||
type ValidationRequestHandlerFunc func(
|
||||
privVal types.PrivValidator,
|
||||
requestMessage SignerMessage,
|
||||
chainID string) (SignerMessage, error)
|
||||
|
||||
type SignerServer struct {
|
||||
cmn.BaseService
|
||||
|
||||
endpoint *SignerDialerEndpoint
|
||||
chainID string
|
||||
privVal types.PrivValidator
|
||||
|
||||
handlerMtx sync.Mutex
|
||||
validationRequestHandler ValidationRequestHandlerFunc
|
||||
}
|
||||
|
||||
func NewSignerServer(endpoint *SignerDialerEndpoint, chainID string, privVal types.PrivValidator) *SignerServer {
|
||||
ss := &SignerServer{
|
||||
endpoint: endpoint,
|
||||
chainID: chainID,
|
||||
privVal: privVal,
|
||||
validationRequestHandler: DefaultValidationRequestHandler,
|
||||
}
|
||||
|
||||
ss.BaseService = *cmn.NewBaseService(endpoint.Logger, "SignerServer", ss)
|
||||
|
||||
return ss
|
||||
}
|
||||
|
||||
// OnStart implements cmn.Service.
|
||||
func (ss *SignerServer) OnStart() error {
|
||||
go ss.serviceLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements cmn.Service.
|
||||
func (ss *SignerServer) OnStop() {
|
||||
ss.endpoint.Logger.Debug("SignerServer: OnStop calling Close")
|
||||
_ = ss.endpoint.Close()
|
||||
}
|
||||
|
||||
// SetRequestHandler override the default function that is used to service requests
|
||||
func (ss *SignerServer) SetRequestHandler(validationRequestHandler ValidationRequestHandlerFunc) {
|
||||
ss.handlerMtx.Lock()
|
||||
defer ss.handlerMtx.Unlock()
|
||||
ss.validationRequestHandler = validationRequestHandler
|
||||
}
|
||||
|
||||
func (ss *SignerServer) servicePendingRequest() {
|
||||
if !ss.IsRunning() {
|
||||
return // Ignore error from closing.
|
||||
}
|
||||
|
||||
req, err := ss.endpoint.ReadMessage()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
ss.Logger.Error("SignerServer: HandleMessage", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
var res SignerMessage
|
||||
{
|
||||
// limit the scope of the lock
|
||||
ss.handlerMtx.Lock()
|
||||
defer ss.handlerMtx.Unlock()
|
||||
res, err = ss.validationRequestHandler(ss.privVal, req, ss.chainID)
|
||||
if err != nil {
|
||||
// only log the error; we'll reply with an error in res
|
||||
ss.Logger.Error("SignerServer: handleMessage", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if res != nil {
|
||||
err = ss.endpoint.WriteMessage(res)
|
||||
if err != nil {
|
||||
ss.Logger.Error("SignerServer: writeMessage", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ss *SignerServer) serviceLoop() {
|
||||
for {
|
||||
select {
|
||||
default:
|
||||
err := ss.endpoint.ensureConnection()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
ss.servicePendingRequest()
|
||||
|
||||
case <-ss.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,139 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
// SignerServiceEndpointOption sets an optional parameter on the SignerServiceEndpoint.
|
||||
type SignerServiceEndpointOption func(*SignerServiceEndpoint)
|
||||
|
||||
// SignerServiceEndpointTimeoutReadWrite sets the read and write timeout for connections
|
||||
// from external signing processes.
|
||||
func SignerServiceEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceEndpointOption {
|
||||
return func(ss *SignerServiceEndpoint) { ss.timeoutReadWrite = timeout }
|
||||
}
|
||||
|
||||
// SignerServiceEndpointConnRetries sets the amount of attempted retries to connect.
|
||||
func SignerServiceEndpointConnRetries(retries int) SignerServiceEndpointOption {
|
||||
return func(ss *SignerServiceEndpoint) { ss.connRetries = retries }
|
||||
}
|
||||
|
||||
// SignerServiceEndpoint dials using its dialer and responds to any
|
||||
// signature requests using its privVal.
|
||||
type SignerServiceEndpoint struct {
|
||||
cmn.BaseService
|
||||
|
||||
chainID string
|
||||
timeoutReadWrite time.Duration
|
||||
connRetries int
|
||||
privVal types.PrivValidator
|
||||
|
||||
dialer SocketDialer
|
||||
conn net.Conn
|
||||
}
|
||||
|
||||
// NewSignerServiceEndpoint returns a SignerServiceEndpoint that will dial using the given
|
||||
// dialer and respond to any signature requests over the connection
|
||||
// using the given privVal.
|
||||
func NewSignerServiceEndpoint(
|
||||
logger log.Logger,
|
||||
chainID string,
|
||||
privVal types.PrivValidator,
|
||||
dialer SocketDialer,
|
||||
) *SignerServiceEndpoint {
|
||||
se := &SignerServiceEndpoint{
|
||||
chainID: chainID,
|
||||
timeoutReadWrite: time.Second * defaultTimeoutReadWriteSeconds,
|
||||
connRetries: defaultMaxDialRetries,
|
||||
privVal: privVal,
|
||||
dialer: dialer,
|
||||
}
|
||||
|
||||
se.BaseService = *cmn.NewBaseService(logger, "SignerServiceEndpoint", se)
|
||||
return se
|
||||
}
|
||||
|
||||
// OnStart implements cmn.Service.
|
||||
func (se *SignerServiceEndpoint) OnStart() error {
|
||||
conn, err := se.connect()
|
||||
if err != nil {
|
||||
se.Logger.Error("OnStart", "err", err)
|
||||
return err
|
||||
}
|
||||
|
||||
se.conn = conn
|
||||
go se.handleConnection(conn)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements cmn.Service.
|
||||
func (se *SignerServiceEndpoint) OnStop() {
|
||||
if se.conn == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := se.conn.Close(); err != nil {
|
||||
se.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed"))
|
||||
}
|
||||
}
|
||||
|
||||
func (se *SignerServiceEndpoint) connect() (net.Conn, error) {
|
||||
for retries := 0; retries < se.connRetries; retries++ {
|
||||
// Don't sleep if it is the first retry.
|
||||
if retries > 0 {
|
||||
time.Sleep(se.timeoutReadWrite)
|
||||
}
|
||||
|
||||
conn, err := se.dialer()
|
||||
if err == nil {
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
se.Logger.Error("dialing", "err", err)
|
||||
}
|
||||
|
||||
return nil, ErrDialRetryMax
|
||||
}
|
||||
|
||||
func (se *SignerServiceEndpoint) handleConnection(conn net.Conn) {
|
||||
for {
|
||||
if !se.IsRunning() {
|
||||
return // Ignore error from listener closing.
|
||||
}
|
||||
|
||||
// Reset the connection deadline
|
||||
deadline := time.Now().Add(se.timeoutReadWrite)
|
||||
err := conn.SetDeadline(deadline)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
req, err := readMsg(conn)
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
se.Logger.Error("handleConnection readMsg", "err", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
res, err := handleRequest(req, se.chainID, se.privVal)
|
||||
|
||||
if err != nil {
|
||||
// only log the error; we'll reply with an error in res
|
||||
se.Logger.Error("handleConnection handleRequest", "err", err)
|
||||
}
|
||||
|
||||
err = writeMsg(conn, res)
|
||||
if err != nil {
|
||||
se.Logger.Error("handleConnection writeMsg", "err", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
@@ -1,230 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultHeartbeatSeconds = 2
|
||||
defaultMaxDialRetries = 10
|
||||
)
|
||||
|
||||
var (
|
||||
heartbeatPeriod = time.Second * defaultHeartbeatSeconds
|
||||
)
|
||||
|
||||
// SignerValidatorEndpointOption sets an optional parameter on the SocketVal.
|
||||
type SignerValidatorEndpointOption func(*SignerValidatorEndpoint)
|
||||
|
||||
// SignerValidatorEndpointSetHeartbeat sets the period on which to check the liveness of the
|
||||
// connected Signer connections.
|
||||
func SignerValidatorEndpointSetHeartbeat(period time.Duration) SignerValidatorEndpointOption {
|
||||
return func(sc *SignerValidatorEndpoint) { sc.heartbeatPeriod = period }
|
||||
}
|
||||
|
||||
// SocketVal implements PrivValidator.
|
||||
// It listens for an external process to dial in and uses
|
||||
// the socket to request signatures.
|
||||
type SignerValidatorEndpoint struct {
|
||||
cmn.BaseService
|
||||
|
||||
listener net.Listener
|
||||
|
||||
// ping
|
||||
cancelPingCh chan struct{}
|
||||
pingTicker *time.Ticker
|
||||
heartbeatPeriod time.Duration
|
||||
|
||||
// signer is mutable since it can be reset if the connection fails.
|
||||
// failures are detected by a background ping routine.
|
||||
// All messages are request/response, so we hold the mutex
|
||||
// so only one request/response pair can happen at a time.
|
||||
// Methods on the underlying net.Conn itself are already goroutine safe.
|
||||
mtx sync.Mutex
|
||||
|
||||
// TODO: Signer should encapsulate and hide the endpoint completely. Invert the relation
|
||||
signer *SignerRemote
|
||||
}
|
||||
|
||||
// Check that SignerValidatorEndpoint implements PrivValidator.
|
||||
var _ types.PrivValidator = (*SignerValidatorEndpoint)(nil)
|
||||
|
||||
// NewSignerValidatorEndpoint returns an instance of SignerValidatorEndpoint.
|
||||
func NewSignerValidatorEndpoint(logger log.Logger, listener net.Listener) *SignerValidatorEndpoint {
|
||||
sc := &SignerValidatorEndpoint{
|
||||
listener: listener,
|
||||
heartbeatPeriod: heartbeatPeriod,
|
||||
}
|
||||
|
||||
sc.BaseService = *cmn.NewBaseService(logger, "SignerValidatorEndpoint", sc)
|
||||
|
||||
return sc
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// Implement PrivValidator
|
||||
|
||||
// GetPubKey implements PrivValidator.
|
||||
func (ve *SignerValidatorEndpoint) GetPubKey() crypto.PubKey {
|
||||
ve.mtx.Lock()
|
||||
defer ve.mtx.Unlock()
|
||||
return ve.signer.GetPubKey()
|
||||
}
|
||||
|
||||
// SignVote implements PrivValidator.
|
||||
func (ve *SignerValidatorEndpoint) SignVote(chainID string, vote *types.Vote) error {
|
||||
ve.mtx.Lock()
|
||||
defer ve.mtx.Unlock()
|
||||
return ve.signer.SignVote(chainID, vote)
|
||||
}
|
||||
|
||||
// SignProposal implements PrivValidator.
|
||||
func (ve *SignerValidatorEndpoint) SignProposal(chainID string, proposal *types.Proposal) error {
|
||||
ve.mtx.Lock()
|
||||
defer ve.mtx.Unlock()
|
||||
return ve.signer.SignProposal(chainID, proposal)
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// More thread safe methods proxied to the signer
|
||||
|
||||
// Ping is used to check connection health.
|
||||
func (ve *SignerValidatorEndpoint) Ping() error {
|
||||
ve.mtx.Lock()
|
||||
defer ve.mtx.Unlock()
|
||||
return ve.signer.Ping()
|
||||
}
|
||||
|
||||
// Close closes the underlying net.Conn.
|
||||
func (ve *SignerValidatorEndpoint) Close() {
|
||||
ve.mtx.Lock()
|
||||
defer ve.mtx.Unlock()
|
||||
if ve.signer != nil {
|
||||
if err := ve.signer.Close(); err != nil {
|
||||
ve.Logger.Error("OnStop", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
if ve.listener != nil {
|
||||
if err := ve.listener.Close(); err != nil {
|
||||
ve.Logger.Error("OnStop", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// Service start and stop
|
||||
|
||||
// OnStart implements cmn.Service.
|
||||
func (ve *SignerValidatorEndpoint) OnStart() error {
|
||||
if closed, err := ve.reset(); err != nil {
|
||||
ve.Logger.Error("OnStart", "err", err)
|
||||
return err
|
||||
} else if closed {
|
||||
return fmt.Errorf("listener is closed")
|
||||
}
|
||||
|
||||
// Start a routine to keep the connection alive
|
||||
ve.cancelPingCh = make(chan struct{}, 1)
|
||||
ve.pingTicker = time.NewTicker(ve.heartbeatPeriod)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ve.pingTicker.C:
|
||||
err := ve.Ping()
|
||||
if err != nil {
|
||||
ve.Logger.Error("Ping", "err", err)
|
||||
if err == ErrUnexpectedResponse {
|
||||
return
|
||||
}
|
||||
|
||||
closed, err := ve.reset()
|
||||
if err != nil {
|
||||
ve.Logger.Error("Reconnecting to remote signer failed", "err", err)
|
||||
continue
|
||||
}
|
||||
if closed {
|
||||
ve.Logger.Info("listener is closing")
|
||||
return
|
||||
}
|
||||
|
||||
ve.Logger.Info("Re-created connection to remote signer", "impl", ve)
|
||||
}
|
||||
case <-ve.cancelPingCh:
|
||||
ve.pingTicker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// OnStop implements cmn.Service.
|
||||
func (ve *SignerValidatorEndpoint) OnStop() {
|
||||
if ve.cancelPingCh != nil {
|
||||
close(ve.cancelPingCh)
|
||||
}
|
||||
ve.Close()
|
||||
}
|
||||
|
||||
//--------------------------------------------------------
|
||||
// Connection and signer management
|
||||
|
||||
// waits to accept and sets a new connection.
|
||||
// connection is closed in OnStop.
|
||||
// returns true if the listener is closed
|
||||
// (ie. it returns a nil conn).
|
||||
func (ve *SignerValidatorEndpoint) reset() (closed bool, err error) {
|
||||
ve.mtx.Lock()
|
||||
defer ve.mtx.Unlock()
|
||||
|
||||
// first check if the conn already exists and close it.
|
||||
if ve.signer != nil {
|
||||
if tmpErr := ve.signer.Close(); tmpErr != nil {
|
||||
ve.Logger.Error("error closing socket val connection during reset", "err", tmpErr)
|
||||
}
|
||||
}
|
||||
|
||||
// wait for a new conn
|
||||
conn, err := ve.acceptConnection()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// listener is closed
|
||||
if conn == nil {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
ve.signer, err = NewSignerRemote(conn)
|
||||
if err != nil {
|
||||
// failed to fetch the pubkey. close out the connection.
|
||||
if tmpErr := conn.Close(); tmpErr != nil {
|
||||
ve.Logger.Error("error closing connection", "err", tmpErr)
|
||||
}
|
||||
return false, err
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Attempt to accept a connection.
|
||||
// Times out after the listener's timeoutAccept
|
||||
func (ve *SignerValidatorEndpoint) acceptConnection() (net.Conn, error) {
|
||||
conn, err := ve.listener.Accept()
|
||||
if err != nil {
|
||||
if !ve.IsRunning() {
|
||||
return nil, nil // Ignore error from listener closing.
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return conn, nil
|
||||
}
|
@@ -1,506 +0,0 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var (
|
||||
testTimeoutAccept = defaultTimeoutAcceptSeconds * time.Second
|
||||
|
||||
testTimeoutReadWrite = 100 * time.Millisecond
|
||||
testTimeoutReadWrite2o3 = 66 * time.Millisecond // 2/3 of the other one
|
||||
|
||||
testTimeoutHeartbeat = 10 * time.Millisecond
|
||||
testTimeoutHeartbeat3o2 = 6 * time.Millisecond // 3/2 of the other one
|
||||
)
|
||||
|
||||
type socketTestCase struct {
|
||||
addr string
|
||||
dialer SocketDialer
|
||||
}
|
||||
|
||||
func socketTestCases(t *testing.T) []socketTestCase {
|
||||
tcpAddr := fmt.Sprintf("tcp://%s", testFreeTCPAddr(t))
|
||||
unixFilePath, err := testUnixAddr()
|
||||
require.NoError(t, err)
|
||||
unixAddr := fmt.Sprintf("unix://%s", unixFilePath)
|
||||
return []socketTestCase{
|
||||
{
|
||||
addr: tcpAddr,
|
||||
dialer: DialTCPFn(tcpAddr, testTimeoutReadWrite, ed25519.GenPrivKey()),
|
||||
},
|
||||
{
|
||||
addr: unixAddr,
|
||||
dialer: DialUnixFn(unixFilePath),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestSocketPVAddress(t *testing.T) {
|
||||
for _, tc := range socketTestCases(t) {
|
||||
// Execute the test within a closure to ensure the deferred statements
|
||||
// are called between each for loop iteration, for isolated test cases.
|
||||
func() {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
validatorEndpoint, serviceEndpoint = testSetupSocketPair(t, chainID, types.NewMockPV(), tc.addr, tc.dialer)
|
||||
)
|
||||
defer validatorEndpoint.Stop()
|
||||
defer serviceEndpoint.Stop()
|
||||
|
||||
serviceAddr := serviceEndpoint.privVal.GetPubKey().Address()
|
||||
validatorAddr := validatorEndpoint.GetPubKey().Address()
|
||||
|
||||
assert.Equal(t, serviceAddr, validatorAddr)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestSocketPVPubKey(t *testing.T) {
|
||||
for _, tc := range socketTestCases(t) {
|
||||
func() {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
|
||||
t,
|
||||
chainID,
|
||||
types.NewMockPV(),
|
||||
tc.addr,
|
||||
tc.dialer)
|
||||
)
|
||||
defer validatorEndpoint.Stop()
|
||||
defer serviceEndpoint.Stop()
|
||||
|
||||
clientKey := validatorEndpoint.GetPubKey()
|
||||
privvalPubKey := serviceEndpoint.privVal.GetPubKey()
|
||||
|
||||
assert.Equal(t, privvalPubKey, clientKey)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestSocketPVProposal(t *testing.T) {
|
||||
for _, tc := range socketTestCases(t) {
|
||||
func() {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
|
||||
t,
|
||||
chainID,
|
||||
types.NewMockPV(),
|
||||
tc.addr,
|
||||
tc.dialer)
|
||||
|
||||
ts = time.Now()
|
||||
privProposal = &types.Proposal{Timestamp: ts}
|
||||
clientProposal = &types.Proposal{Timestamp: ts}
|
||||
)
|
||||
defer validatorEndpoint.Stop()
|
||||
defer serviceEndpoint.Stop()
|
||||
|
||||
require.NoError(t, serviceEndpoint.privVal.SignProposal(chainID, privProposal))
|
||||
require.NoError(t, validatorEndpoint.SignProposal(chainID, clientProposal))
|
||||
|
||||
assert.Equal(t, privProposal.Signature, clientProposal.Signature)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestSocketPVVote(t *testing.T) {
|
||||
for _, tc := range socketTestCases(t) {
|
||||
func() {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
|
||||
t,
|
||||
chainID,
|
||||
types.NewMockPV(),
|
||||
tc.addr,
|
||||
tc.dialer)
|
||||
|
||||
ts = time.Now()
|
||||
vType = types.PrecommitType
|
||||
want = &types.Vote{Timestamp: ts, Type: vType}
|
||||
have = &types.Vote{Timestamp: ts, Type: vType}
|
||||
)
|
||||
defer validatorEndpoint.Stop()
|
||||
defer serviceEndpoint.Stop()
|
||||
|
||||
require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want))
|
||||
require.NoError(t, validatorEndpoint.SignVote(chainID, have))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestSocketPVVoteResetDeadline(t *testing.T) {
|
||||
for _, tc := range socketTestCases(t) {
|
||||
func() {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
|
||||
t,
|
||||
chainID,
|
||||
types.NewMockPV(),
|
||||
tc.addr,
|
||||
tc.dialer)
|
||||
|
||||
ts = time.Now()
|
||||
vType = types.PrecommitType
|
||||
want = &types.Vote{Timestamp: ts, Type: vType}
|
||||
have = &types.Vote{Timestamp: ts, Type: vType}
|
||||
)
|
||||
defer validatorEndpoint.Stop()
|
||||
defer serviceEndpoint.Stop()
|
||||
|
||||
time.Sleep(testTimeoutReadWrite2o3)
|
||||
|
||||
require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want))
|
||||
require.NoError(t, validatorEndpoint.SignVote(chainID, have))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
|
||||
// This would exceed the deadline if it was not extended by the previous message
|
||||
time.Sleep(testTimeoutReadWrite2o3)
|
||||
|
||||
require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want))
|
||||
require.NoError(t, validatorEndpoint.SignVote(chainID, have))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestSocketPVVoteKeepalive(t *testing.T) {
|
||||
for _, tc := range socketTestCases(t) {
|
||||
func() {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
|
||||
t,
|
||||
chainID,
|
||||
types.NewMockPV(),
|
||||
tc.addr,
|
||||
tc.dialer)
|
||||
|
||||
ts = time.Now()
|
||||
vType = types.PrecommitType
|
||||
want = &types.Vote{Timestamp: ts, Type: vType}
|
||||
have = &types.Vote{Timestamp: ts, Type: vType}
|
||||
)
|
||||
defer validatorEndpoint.Stop()
|
||||
defer serviceEndpoint.Stop()
|
||||
|
||||
time.Sleep(testTimeoutReadWrite * 2)
|
||||
|
||||
require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want))
|
||||
require.NoError(t, validatorEndpoint.SignVote(chainID, have))
|
||||
assert.Equal(t, want.Signature, have.Signature)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestSocketPVDeadline(t *testing.T) {
|
||||
for _, tc := range socketTestCases(t) {
|
||||
func() {
|
||||
var (
|
||||
listenc = make(chan struct{})
|
||||
thisConnTimeout = 100 * time.Millisecond
|
||||
validatorEndpoint = newSignerValidatorEndpoint(log.TestingLogger(), tc.addr, thisConnTimeout)
|
||||
)
|
||||
|
||||
go func(sc *SignerValidatorEndpoint) {
|
||||
defer close(listenc)
|
||||
|
||||
// Note: the TCP connection times out at the accept() phase,
|
||||
// whereas the Unix domain sockets connection times out while
|
||||
// attempting to fetch the remote signer's public key.
|
||||
assert.True(t, IsConnTimeout(sc.Start()))
|
||||
|
||||
assert.False(t, sc.IsRunning())
|
||||
}(validatorEndpoint)
|
||||
|
||||
for {
|
||||
_, err := cmn.Connect(tc.addr)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
<-listenc
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoteSignVoteErrors(t *testing.T) {
|
||||
for _, tc := range socketTestCases(t) {
|
||||
func() {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
|
||||
t,
|
||||
chainID,
|
||||
types.NewErroringMockPV(),
|
||||
tc.addr,
|
||||
tc.dialer)
|
||||
|
||||
ts = time.Now()
|
||||
vType = types.PrecommitType
|
||||
vote = &types.Vote{Timestamp: ts, Type: vType}
|
||||
)
|
||||
defer validatorEndpoint.Stop()
|
||||
defer serviceEndpoint.Stop()
|
||||
|
||||
err := validatorEndpoint.SignVote("", vote)
|
||||
require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
|
||||
|
||||
err = serviceEndpoint.privVal.SignVote(chainID, vote)
|
||||
require.Error(t, err)
|
||||
err = validatorEndpoint.SignVote(chainID, vote)
|
||||
require.Error(t, err)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestRemoteSignProposalErrors(t *testing.T) {
|
||||
for _, tc := range socketTestCases(t) {
|
||||
func() {
|
||||
var (
|
||||
chainID = cmn.RandStr(12)
|
||||
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
|
||||
t,
|
||||
chainID,
|
||||
types.NewErroringMockPV(),
|
||||
tc.addr,
|
||||
tc.dialer)
|
||||
|
||||
ts = time.Now()
|
||||
proposal = &types.Proposal{Timestamp: ts}
|
||||
)
|
||||
defer validatorEndpoint.Stop()
|
||||
defer serviceEndpoint.Stop()
|
||||
|
||||
err := validatorEndpoint.SignProposal("", proposal)
|
||||
require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
|
||||
|
||||
err = serviceEndpoint.privVal.SignProposal(chainID, proposal)
|
||||
require.Error(t, err)
|
||||
|
||||
err = validatorEndpoint.SignProposal(chainID, proposal)
|
||||
require.Error(t, err)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestErrUnexpectedResponse(t *testing.T) {
|
||||
for _, tc := range socketTestCases(t) {
|
||||
func() {
|
||||
var (
|
||||
logger = log.TestingLogger()
|
||||
chainID = cmn.RandStr(12)
|
||||
readyCh = make(chan struct{})
|
||||
errCh = make(chan error, 1)
|
||||
|
||||
serviceEndpoint = NewSignerServiceEndpoint(
|
||||
logger,
|
||||
chainID,
|
||||
types.NewMockPV(),
|
||||
tc.dialer,
|
||||
)
|
||||
|
||||
validatorEndpoint = newSignerValidatorEndpoint(
|
||||
logger,
|
||||
tc.addr,
|
||||
testTimeoutReadWrite)
|
||||
)
|
||||
|
||||
testStartEndpoint(t, readyCh, validatorEndpoint)
|
||||
defer validatorEndpoint.Stop()
|
||||
SignerServiceEndpointTimeoutReadWrite(time.Millisecond)(serviceEndpoint)
|
||||
SignerServiceEndpointConnRetries(100)(serviceEndpoint)
|
||||
// we do not want to Start() the remote signer here and instead use the connection to
|
||||
// reply with intentionally wrong replies below:
|
||||
rsConn, err := serviceEndpoint.connect()
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, rsConn)
|
||||
defer rsConn.Close()
|
||||
|
||||
// send over public key to get the remote signer running:
|
||||
go testReadWriteResponse(t, &PubKeyResponse{}, rsConn)
|
||||
<-readyCh
|
||||
|
||||
// Proposal:
|
||||
go func(errc chan error) {
|
||||
errc <- validatorEndpoint.SignProposal(chainID, &types.Proposal{})
|
||||
}(errCh)
|
||||
|
||||
// read request and write wrong response:
|
||||
go testReadWriteResponse(t, &SignedVoteResponse{}, rsConn)
|
||||
err = <-errCh
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, ErrUnexpectedResponse)
|
||||
|
||||
// Vote:
|
||||
go func(errc chan error) {
|
||||
errc <- validatorEndpoint.SignVote(chainID, &types.Vote{})
|
||||
}(errCh)
|
||||
// read request and write wrong response:
|
||||
go testReadWriteResponse(t, &SignedProposalResponse{}, rsConn)
|
||||
err = <-errCh
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err, ErrUnexpectedResponse)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func TestRetryConnToRemoteSigner(t *testing.T) {
|
||||
for _, tc := range socketTestCases(t) {
|
||||
func() {
|
||||
var (
|
||||
logger = log.TestingLogger()
|
||||
chainID = cmn.RandStr(12)
|
||||
readyCh = make(chan struct{})
|
||||
|
||||
serviceEndpoint = NewSignerServiceEndpoint(
|
||||
logger,
|
||||
chainID,
|
||||
types.NewMockPV(),
|
||||
tc.dialer,
|
||||
)
|
||||
thisConnTimeout = testTimeoutReadWrite
|
||||
validatorEndpoint = newSignerValidatorEndpoint(logger, tc.addr, thisConnTimeout)
|
||||
)
|
||||
// Ping every:
|
||||
SignerValidatorEndpointSetHeartbeat(testTimeoutHeartbeat)(validatorEndpoint)
|
||||
|
||||
SignerServiceEndpointTimeoutReadWrite(testTimeoutReadWrite)(serviceEndpoint)
|
||||
SignerServiceEndpointConnRetries(10)(serviceEndpoint)
|
||||
|
||||
testStartEndpoint(t, readyCh, validatorEndpoint)
|
||||
defer validatorEndpoint.Stop()
|
||||
require.NoError(t, serviceEndpoint.Start())
|
||||
assert.True(t, serviceEndpoint.IsRunning())
|
||||
|
||||
<-readyCh
|
||||
time.Sleep(testTimeoutHeartbeat * 2)
|
||||
|
||||
serviceEndpoint.Stop()
|
||||
rs2 := NewSignerServiceEndpoint(
|
||||
logger,
|
||||
chainID,
|
||||
types.NewMockPV(),
|
||||
tc.dialer,
|
||||
)
|
||||
// let some pings pass
|
||||
time.Sleep(testTimeoutHeartbeat3o2)
|
||||
require.NoError(t, rs2.Start())
|
||||
assert.True(t, rs2.IsRunning())
|
||||
defer rs2.Stop()
|
||||
|
||||
// give the client some time to re-establish the conn to the remote signer
|
||||
// should see sth like this in the logs:
|
||||
//
|
||||
// E[10016-01-10|17:12:46.128] Ping err="remote signer timed out"
|
||||
// I[10016-01-10|17:16:42.447] Re-created connection to remote signer impl=SocketVal
|
||||
time.Sleep(testTimeoutReadWrite * 2)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func newSignerValidatorEndpoint(logger log.Logger, addr string, timeoutReadWrite time.Duration) *SignerValidatorEndpoint {
|
||||
proto, address := cmn.ProtocolAndAddress(addr)
|
||||
|
||||
ln, err := net.Listen(proto, address)
|
||||
logger.Info("Listening at", "proto", proto, "address", address)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
var listener net.Listener
|
||||
|
||||
if proto == "unix" {
|
||||
unixLn := NewUnixListener(ln)
|
||||
UnixListenerTimeoutAccept(testTimeoutAccept)(unixLn)
|
||||
UnixListenerTimeoutReadWrite(timeoutReadWrite)(unixLn)
|
||||
listener = unixLn
|
||||
} else {
|
||||
tcpLn := NewTCPListener(ln, ed25519.GenPrivKey())
|
||||
TCPListenerTimeoutAccept(testTimeoutAccept)(tcpLn)
|
||||
TCPListenerTimeoutReadWrite(timeoutReadWrite)(tcpLn)
|
||||
listener = tcpLn
|
||||
}
|
||||
|
||||
return NewSignerValidatorEndpoint(logger, listener)
|
||||
}
|
||||
|
||||
func testSetupSocketPair(
|
||||
t *testing.T,
|
||||
chainID string,
|
||||
privValidator types.PrivValidator,
|
||||
addr string,
|
||||
socketDialer SocketDialer,
|
||||
) (*SignerValidatorEndpoint, *SignerServiceEndpoint) {
|
||||
var (
|
||||
logger = log.TestingLogger()
|
||||
privVal = privValidator
|
||||
readyc = make(chan struct{})
|
||||
serviceEndpoint = NewSignerServiceEndpoint(
|
||||
logger,
|
||||
chainID,
|
||||
privVal,
|
||||
socketDialer,
|
||||
)
|
||||
|
||||
thisConnTimeout = testTimeoutReadWrite
|
||||
validatorEndpoint = newSignerValidatorEndpoint(logger, addr, thisConnTimeout)
|
||||
)
|
||||
|
||||
SignerValidatorEndpointSetHeartbeat(testTimeoutHeartbeat)(validatorEndpoint)
|
||||
SignerServiceEndpointTimeoutReadWrite(testTimeoutReadWrite)(serviceEndpoint)
|
||||
SignerServiceEndpointConnRetries(1e6)(serviceEndpoint)
|
||||
|
||||
testStartEndpoint(t, readyc, validatorEndpoint)
|
||||
|
||||
require.NoError(t, serviceEndpoint.Start())
|
||||
assert.True(t, serviceEndpoint.IsRunning())
|
||||
|
||||
<-readyc
|
||||
|
||||
return validatorEndpoint, serviceEndpoint
|
||||
}
|
||||
|
||||
func testReadWriteResponse(t *testing.T, resp RemoteSignerMsg, rsConn net.Conn) {
|
||||
_, err := readMsg(rsConn)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = writeMsg(rsConn, resp)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func testStartEndpoint(t *testing.T, readyCh chan struct{}, sc *SignerValidatorEndpoint) {
|
||||
go func(sc *SignerValidatorEndpoint) {
|
||||
require.NoError(t, sc.Start())
|
||||
assert.True(t, sc.IsRunning())
|
||||
|
||||
readyCh <- struct{}{}
|
||||
}(sc)
|
||||
}
|
||||
|
||||
// testFreeTCPAddr claims a free port so we don't block on listener being ready.
|
||||
func testFreeTCPAddr(t *testing.T) string {
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
defer ln.Close()
|
||||
|
||||
return fmt.Sprintf("127.0.0.1:%d", ln.Addr().(*net.TCPAddr).Port)
|
||||
}
|
@@ -1,26 +1,49 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
)
|
||||
|
||||
func getDialerTestCases(t *testing.T) []dialerTestCase {
|
||||
tcpAddr := GetFreeLocalhostAddrPort()
|
||||
unixFilePath, err := testUnixAddr()
|
||||
require.NoError(t, err)
|
||||
unixAddr := fmt.Sprintf("unix://%s", unixFilePath)
|
||||
|
||||
return []dialerTestCase{
|
||||
{
|
||||
addr: tcpAddr,
|
||||
dialer: DialTCPFn(tcpAddr, testTimeoutReadWrite, ed25519.GenPrivKey()),
|
||||
},
|
||||
{
|
||||
addr: unixAddr,
|
||||
dialer: DialUnixFn(unixFilePath),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestIsConnTimeoutForFundamentalTimeouts(t *testing.T) {
|
||||
// Generate a networking timeout
|
||||
dialer := DialTCPFn(testFreeTCPAddr(t), time.Millisecond, ed25519.GenPrivKey())
|
||||
tcpAddr := GetFreeLocalhostAddrPort()
|
||||
dialer := DialTCPFn(tcpAddr, time.Millisecond, ed25519.GenPrivKey())
|
||||
_, err := dialer()
|
||||
assert.Error(t, err)
|
||||
assert.True(t, IsConnTimeout(err))
|
||||
}
|
||||
|
||||
func TestIsConnTimeoutForWrappedConnTimeouts(t *testing.T) {
|
||||
dialer := DialTCPFn(testFreeTCPAddr(t), time.Millisecond, ed25519.GenPrivKey())
|
||||
tcpAddr := GetFreeLocalhostAddrPort()
|
||||
dialer := DialTCPFn(tcpAddr, time.Millisecond, ed25519.GenPrivKey())
|
||||
_, err := dialer()
|
||||
assert.Error(t, err)
|
||||
err = cmn.ErrorWrap(ErrConnTimeout, err.Error())
|
||||
err = cmn.ErrorWrap(ErrConnectionTimeout, err.Error())
|
||||
assert.True(t, IsConnTimeout(err))
|
||||
}
|
||||
|
@@ -9,8 +9,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
defaultTimeoutAcceptSeconds = 3
|
||||
defaultTimeoutReadWriteSeconds = 3
|
||||
defaultTimeoutAcceptSeconds = 3
|
||||
defaultPingPeriodMilliseconds = 100
|
||||
)
|
||||
|
||||
// timeoutError can be used to check if an error returned from the netp package
|
||||
|
@@ -1,7 +1,12 @@
|
||||
package privval
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
)
|
||||
|
||||
// IsConnTimeout returns a boolean indicating whether the error is known to
|
||||
@@ -9,7 +14,7 @@ import (
|
||||
// network timeouts, as well as ErrConnTimeout errors.
|
||||
func IsConnTimeout(err error) bool {
|
||||
if cmnErr, ok := err.(cmn.Error); ok {
|
||||
if cmnErr.Data() == ErrConnTimeout {
|
||||
if cmnErr.Data() == ErrConnectionTimeout {
|
||||
return true
|
||||
}
|
||||
}
|
||||
@@ -18,3 +23,39 @@ func IsConnTimeout(err error) bool {
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// NewSignerListener creates a new SignerListenerEndpoint using the corresponding listen address
|
||||
func NewSignerListener(listenAddr string, logger log.Logger) (*SignerListenerEndpoint, error) {
|
||||
var listener net.Listener
|
||||
|
||||
protocol, address := cmn.ProtocolAndAddress(listenAddr)
|
||||
ln, err := net.Listen(protocol, address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch protocol {
|
||||
case "unix":
|
||||
listener = NewUnixListener(ln)
|
||||
case "tcp":
|
||||
// TODO: persist this key so external signer can actually authenticate us
|
||||
listener = NewTCPListener(ln, ed25519.GenPrivKey())
|
||||
default:
|
||||
return nil, fmt.Errorf(
|
||||
"wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
|
||||
protocol,
|
||||
)
|
||||
}
|
||||
|
||||
pve := NewSignerListenerEndpoint(logger.With("module", "privval"), listener)
|
||||
|
||||
return pve, nil
|
||||
}
|
||||
|
||||
// GetFreeLocalhostAddrPort returns a free localhost:port address
|
||||
func GetFreeLocalhostAddrPort() string {
|
||||
port, err := cmn.GetFreePort()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return fmt.Sprintf("127.0.0.1:%d", port)
|
||||
}
|
||||
|
@@ -5,6 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
)
|
||||
|
||||
|
@@ -160,7 +160,7 @@ func validatePage(page, perPage, totalCount int) (int, error) {
|
||||
pages = 1 // one page (even if it's empty)
|
||||
}
|
||||
if page < 0 || page > pages {
|
||||
return 1, fmt.Errorf("page should be within [0, %d] range, given %d", pages, page)
|
||||
return 1, fmt.Errorf("page should be within [1, %d] range, given %d", pages, page)
|
||||
}
|
||||
|
||||
return page, nil
|
||||
|
@@ -49,7 +49,7 @@ var _ error = (*TestHarnessError)(nil)
|
||||
// with this version of Tendermint.
|
||||
type TestHarness struct {
|
||||
addr string
|
||||
spv *privval.SignerValidatorEndpoint
|
||||
signerClient *privval.SignerClient
|
||||
fpv *privval.FilePV
|
||||
chainID string
|
||||
acceptRetries int
|
||||
@@ -101,14 +101,19 @@ func NewTestHarness(logger log.Logger, cfg TestHarnessConfig) (*TestHarness, err
|
||||
}
|
||||
logger.Info("Loaded genesis file", "chainID", st.ChainID)
|
||||
|
||||
spv, err := newTestHarnessSocketVal(logger, cfg)
|
||||
spv, err := newTestHarnessListener(logger, cfg)
|
||||
if err != nil {
|
||||
return nil, newTestHarnessError(ErrFailedToCreateListener, err, "")
|
||||
}
|
||||
|
||||
signerClient, err := privval.NewSignerClient(spv)
|
||||
if err != nil {
|
||||
return nil, newTestHarnessError(ErrFailedToCreateListener, err, "")
|
||||
}
|
||||
|
||||
return &TestHarness{
|
||||
addr: cfg.BindAddr,
|
||||
spv: spv,
|
||||
signerClient: signerClient,
|
||||
fpv: fpv,
|
||||
chainID: st.ChainID,
|
||||
acceptRetries: cfg.AcceptRetries,
|
||||
@@ -135,9 +140,11 @@ func (th *TestHarness) Run() {
|
||||
th.logger.Info("Starting test harness")
|
||||
accepted := false
|
||||
var startErr error
|
||||
|
||||
for acceptRetries := th.acceptRetries; acceptRetries > 0; acceptRetries-- {
|
||||
th.logger.Info("Attempting to accept incoming connection", "acceptRetries", acceptRetries)
|
||||
if err := th.spv.Start(); err != nil {
|
||||
|
||||
if err := th.signerClient.WaitForConnection(10 * time.Millisecond); err != nil {
|
||||
// if it wasn't a timeout error
|
||||
if _, ok := err.(timeoutError); !ok {
|
||||
th.logger.Error("Failed to start listener", "err", err)
|
||||
@@ -149,6 +156,7 @@ func (th *TestHarness) Run() {
|
||||
}
|
||||
startErr = err
|
||||
} else {
|
||||
th.logger.Info("Accepted external connection")
|
||||
accepted = true
|
||||
break
|
||||
}
|
||||
@@ -182,8 +190,8 @@ func (th *TestHarness) Run() {
|
||||
func (th *TestHarness) TestPublicKey() error {
|
||||
th.logger.Info("TEST: Public key of remote signer")
|
||||
th.logger.Info("Local", "pubKey", th.fpv.GetPubKey())
|
||||
th.logger.Info("Remote", "pubKey", th.spv.GetPubKey())
|
||||
if th.fpv.GetPubKey() != th.spv.GetPubKey() {
|
||||
th.logger.Info("Remote", "pubKey", th.signerClient.GetPubKey())
|
||||
if th.fpv.GetPubKey() != th.signerClient.GetPubKey() {
|
||||
th.logger.Error("FAILED: Local and remote public keys do not match")
|
||||
return newTestHarnessError(ErrTestPublicKeyFailed, nil, "")
|
||||
}
|
||||
@@ -211,7 +219,7 @@ func (th *TestHarness) TestSignProposal() error {
|
||||
Timestamp: time.Now(),
|
||||
}
|
||||
propBytes := prop.SignBytes(th.chainID)
|
||||
if err := th.spv.SignProposal(th.chainID, prop); err != nil {
|
||||
if err := th.signerClient.SignProposal(th.chainID, prop); err != nil {
|
||||
th.logger.Error("FAILED: Signing of proposal", "err", err)
|
||||
return newTestHarnessError(ErrTestSignProposalFailed, err, "")
|
||||
}
|
||||
@@ -222,7 +230,7 @@ func (th *TestHarness) TestSignProposal() error {
|
||||
return newTestHarnessError(ErrTestSignProposalFailed, err, "")
|
||||
}
|
||||
// now validate the signature on the proposal
|
||||
if th.spv.GetPubKey().VerifyBytes(propBytes, prop.Signature) {
|
||||
if th.signerClient.GetPubKey().VerifyBytes(propBytes, prop.Signature) {
|
||||
th.logger.Info("Successfully validated proposal signature")
|
||||
} else {
|
||||
th.logger.Error("FAILED: Proposal signature validation failed")
|
||||
@@ -255,7 +263,7 @@ func (th *TestHarness) TestSignVote() error {
|
||||
}
|
||||
voteBytes := vote.SignBytes(th.chainID)
|
||||
// sign the vote
|
||||
if err := th.spv.SignVote(th.chainID, vote); err != nil {
|
||||
if err := th.signerClient.SignVote(th.chainID, vote); err != nil {
|
||||
th.logger.Error("FAILED: Signing of vote", "err", err)
|
||||
return newTestHarnessError(ErrTestSignVoteFailed, err, fmt.Sprintf("voteType=%d", voteType))
|
||||
}
|
||||
@@ -266,7 +274,7 @@ func (th *TestHarness) TestSignVote() error {
|
||||
return newTestHarnessError(ErrTestSignVoteFailed, err, fmt.Sprintf("voteType=%d", voteType))
|
||||
}
|
||||
// now validate the signature on the proposal
|
||||
if th.spv.GetPubKey().VerifyBytes(voteBytes, vote.Signature) {
|
||||
if th.signerClient.GetPubKey().VerifyBytes(voteBytes, vote.Signature) {
|
||||
th.logger.Info("Successfully validated vote signature", "type", voteType)
|
||||
} else {
|
||||
th.logger.Error("FAILED: Vote signature validation failed", "type", voteType)
|
||||
@@ -301,10 +309,9 @@ func (th *TestHarness) Shutdown(err error) {
|
||||
}()
|
||||
}
|
||||
|
||||
if th.spv.IsRunning() {
|
||||
if err := th.spv.Stop(); err != nil {
|
||||
th.logger.Error("Failed to cleanly stop listener: %s", err.Error())
|
||||
}
|
||||
err = th.signerClient.Close()
|
||||
if err != nil {
|
||||
th.logger.Error("Failed to cleanly stop listener: %s", err.Error())
|
||||
}
|
||||
|
||||
if th.exitWhenComplete {
|
||||
@@ -312,9 +319,8 @@ func (th *TestHarness) Shutdown(err error) {
|
||||
}
|
||||
}
|
||||
|
||||
// newTestHarnessSocketVal creates our client instance which we will use for
|
||||
// testing.
|
||||
func newTestHarnessSocketVal(logger log.Logger, cfg TestHarnessConfig) (*privval.SignerValidatorEndpoint, error) {
|
||||
// newTestHarnessListener creates our client instance which we will use for testing.
|
||||
func newTestHarnessListener(logger log.Logger, cfg TestHarnessConfig) (*privval.SignerListenerEndpoint, error) {
|
||||
proto, addr := cmn.ProtocolAndAddress(cfg.BindAddr)
|
||||
if proto == "unix" {
|
||||
// make sure the socket doesn't exist - if so, try to delete it
|
||||
@@ -329,7 +335,7 @@ func newTestHarnessSocketVal(logger log.Logger, cfg TestHarnessConfig) (*privval
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logger.Info("Listening at", "proto", proto, "addr", addr)
|
||||
logger.Info("Listening", "proto", proto, "addr", addr)
|
||||
var svln net.Listener
|
||||
switch proto {
|
||||
case "unix":
|
||||
@@ -347,7 +353,7 @@ func newTestHarnessSocketVal(logger log.Logger, cfg TestHarnessConfig) (*privval
|
||||
logger.Error("Unsupported protocol (must be unix:// or tcp://)", "proto", proto)
|
||||
return nil, newTestHarnessError(ErrInvalidParameters, nil, fmt.Sprintf("Unsupported protocol: %s", proto))
|
||||
}
|
||||
return privval.NewSignerValidatorEndpoint(logger, svln), nil
|
||||
return privval.NewSignerListenerEndpoint(logger, svln), nil
|
||||
}
|
||||
|
||||
func newTestHarnessError(code int, err error, info string) *TestHarnessError {
|
||||
|
@@ -3,19 +3,18 @@ package internal
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/libs/log"
|
||||
"github.com/tendermint/tendermint/privval"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -85,8 +84,8 @@ func TestRemoteSignerTestHarnessMaxAcceptRetriesReached(t *testing.T) {
|
||||
func TestRemoteSignerTestHarnessSuccessfulRun(t *testing.T) {
|
||||
harnessTest(
|
||||
t,
|
||||
func(th *TestHarness) *privval.SignerServiceEndpoint {
|
||||
return newMockRemoteSigner(t, th, th.fpv.Key.PrivKey, false, false)
|
||||
func(th *TestHarness) *privval.SignerServer {
|
||||
return newMockSignerServer(t, th, th.fpv.Key.PrivKey, false, false)
|
||||
},
|
||||
NoError,
|
||||
)
|
||||
@@ -95,8 +94,8 @@ func TestRemoteSignerTestHarnessSuccessfulRun(t *testing.T) {
|
||||
func TestRemoteSignerPublicKeyCheckFailed(t *testing.T) {
|
||||
harnessTest(
|
||||
t,
|
||||
func(th *TestHarness) *privval.SignerServiceEndpoint {
|
||||
return newMockRemoteSigner(t, th, ed25519.GenPrivKey(), false, false)
|
||||
func(th *TestHarness) *privval.SignerServer {
|
||||
return newMockSignerServer(t, th, ed25519.GenPrivKey(), false, false)
|
||||
},
|
||||
ErrTestPublicKeyFailed,
|
||||
)
|
||||
@@ -105,8 +104,8 @@ func TestRemoteSignerPublicKeyCheckFailed(t *testing.T) {
|
||||
func TestRemoteSignerProposalSigningFailed(t *testing.T) {
|
||||
harnessTest(
|
||||
t,
|
||||
func(th *TestHarness) *privval.SignerServiceEndpoint {
|
||||
return newMockRemoteSigner(t, th, th.fpv.Key.PrivKey, true, false)
|
||||
func(th *TestHarness) *privval.SignerServer {
|
||||
return newMockSignerServer(t, th, th.fpv.Key.PrivKey, true, false)
|
||||
},
|
||||
ErrTestSignProposalFailed,
|
||||
)
|
||||
@@ -115,28 +114,30 @@ func TestRemoteSignerProposalSigningFailed(t *testing.T) {
|
||||
func TestRemoteSignerVoteSigningFailed(t *testing.T) {
|
||||
harnessTest(
|
||||
t,
|
||||
func(th *TestHarness) *privval.SignerServiceEndpoint {
|
||||
return newMockRemoteSigner(t, th, th.fpv.Key.PrivKey, false, true)
|
||||
func(th *TestHarness) *privval.SignerServer {
|
||||
return newMockSignerServer(t, th, th.fpv.Key.PrivKey, false, true)
|
||||
},
|
||||
ErrTestSignVoteFailed,
|
||||
)
|
||||
}
|
||||
|
||||
func newMockRemoteSigner(t *testing.T, th *TestHarness, privKey crypto.PrivKey, breakProposalSigning bool, breakVoteSigning bool) *privval.SignerServiceEndpoint {
|
||||
return privval.NewSignerServiceEndpoint(
|
||||
func newMockSignerServer(t *testing.T, th *TestHarness, privKey crypto.PrivKey, breakProposalSigning bool, breakVoteSigning bool) *privval.SignerServer {
|
||||
mockPV := types.NewMockPVWithParams(privKey, breakProposalSigning, breakVoteSigning)
|
||||
|
||||
dialerEndpoint := privval.NewSignerDialerEndpoint(
|
||||
th.logger,
|
||||
th.chainID,
|
||||
types.NewMockPVWithParams(privKey, breakProposalSigning, breakVoteSigning),
|
||||
privval.DialTCPFn(
|
||||
th.addr,
|
||||
time.Duration(defaultConnDeadline)*time.Millisecond,
|
||||
ed25519.GenPrivKey(),
|
||||
),
|
||||
)
|
||||
|
||||
return privval.NewSignerServer(dialerEndpoint, th.chainID, mockPV)
|
||||
}
|
||||
|
||||
// For running relatively standard tests.
|
||||
func harnessTest(t *testing.T, rsMaker func(th *TestHarness) *privval.SignerServiceEndpoint, expectedExitCode int) {
|
||||
func harnessTest(t *testing.T, signerServerMaker func(th *TestHarness) *privval.SignerServer, expectedExitCode int) {
|
||||
cfg := makeConfig(t, 100, 3)
|
||||
defer cleanup(cfg)
|
||||
|
||||
@@ -148,10 +149,10 @@ func harnessTest(t *testing.T, rsMaker func(th *TestHarness) *privval.SignerServ
|
||||
th.Run()
|
||||
}()
|
||||
|
||||
rs := rsMaker(th)
|
||||
require.NoError(t, rs.Start())
|
||||
assert.True(t, rs.IsRunning())
|
||||
defer rs.Stop()
|
||||
ss := signerServerMaker(th)
|
||||
require.NoError(t, ss.Start())
|
||||
assert.True(t, ss.IsRunning())
|
||||
defer ss.Stop()
|
||||
|
||||
<-donec
|
||||
assert.Equal(t, expectedExitCode, th.exitCode)
|
||||
@@ -159,7 +160,7 @@ func harnessTest(t *testing.T, rsMaker func(th *TestHarness) *privval.SignerServ
|
||||
|
||||
func makeConfig(t *testing.T, acceptDeadline, acceptRetries int) TestHarnessConfig {
|
||||
return TestHarnessConfig{
|
||||
BindAddr: testFreeTCPAddr(t),
|
||||
BindAddr: privval.GetFreeLocalhostAddrPort(),
|
||||
KeyFile: makeTempFile("tm-testharness-keyfile", keyFileContents),
|
||||
StateFile: makeTempFile("tm-testharness-statefile", stateFileContents),
|
||||
GenesisFile: makeTempFile("tm-testharness-genesisfile", genesisFileContents),
|
||||
@@ -191,12 +192,3 @@ func makeTempFile(name, content string) string {
|
||||
}
|
||||
return tempFile.Name()
|
||||
}
|
||||
|
||||
// testFreeTCPAddr claims a free port so we don't block on listener being ready.
|
||||
func testFreeTCPAddr(t *testing.T) string {
|
||||
ln, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
defer ln.Close()
|
||||
|
||||
return fmt.Sprintf("127.0.0.1:%d", ln.Addr().(*net.TCPAddr).Port)
|
||||
}
|
||||
|
@@ -12,6 +12,7 @@ import (
|
||||
// PrivValidator defines the functionality of a local Tendermint validator
|
||||
// that signs votes and proposals, and never double signs.
|
||||
type PrivValidator interface {
|
||||
// TODO: Extend the interface to return errors too. Issue: https://github.com/tendermint/tendermint/issues/3602
|
||||
GetPubKey() crypto.PubKey
|
||||
|
||||
SignVote(chainID string, vote *Vote) error
|
||||
|
@@ -20,7 +20,7 @@ const (
|
||||
// Must be a string because scripts like dist.sh read this file.
|
||||
// XXX: Don't change the name of this variable or you will break
|
||||
// automation :)
|
||||
TMCoreSemVer = "0.32.1"
|
||||
TMCoreSemVer = "0.32.2"
|
||||
|
||||
// ABCISemVer is the semantic version of the ABCI library
|
||||
ABCISemVer = "0.16.1"
|
||||
|
Reference in New Issue
Block a user