Compare commits

...

26 Commits

Author SHA1 Message Date
Anton Kaliaev
a507d2a019 rpc: correct error message 2019-08-06 17:17:38 +04:00
Anton Kaliaev
64e38e279e fix changelog msg 2019-08-06 17:17:28 +04:00
Sean Braithwaite
0cf8812b17 [blockchain] v2 riri Schedule composit data structure (#3848)
* Add Schedule:

    + The schedule is a data structure used to determine the optimal
      schedule of requests to the optimal set of peers in order to perform
      fast-sync as fast and efficiently as possible.

* Add some doc strings

* fix golangci

* Add Schedule:

    + The schedule is a data structure used to determine the optimal
      schedule of requests to the optimal set of peers in order to perform
      fast-sync as fast and efficiently as possible.

* Add some doc strings

* remove globals from tests
2019-08-05 17:26:46 +02:00
Juan Leni
f7f034a8be privval: refactor Remote signers (#3370)
This PR is related to #3107 and a continuation of #3351

It is important to emphasise that in the privval original design, client/server and listening/dialing roles are inverted and do not follow a conventional interaction.

Given two hosts A and B:

    Host A is listener/client
    Host B is dialer/server (contains the secret key)
    When A requires a signature, it needs to wait for B to dial in before it can issue a request.
    A only accepts a single connection and any failure leads to dropping the connection and waiting for B to reconnect.

The original rationale behind this design was based on security.

    Host B only allows outbound connections to a list of whitelisted hosts.
    It is not possible to reach B unless B dials in. There are no listening/open ports in B.

This PR results in the following changes:

    Refactors ping/heartbeat to avoid previously existing race conditions.
    Separates transport (dialer/listener) from signing (client/server) concerns to simplify workflow.
    Unifies and abstracts away the differences between unix and tcp sockets.
    A single signer endpoint implementation unifies connection handling code (read/write/close/connection obj)
    The signer request handler (server side) is customizable to increase testability.
    Updates and extends unit tests

A high level overview of the classes is as follows:

Transport (endpoints): The following classes take care of establishing a connection

    SignerDialerEndpoint
    SignerListeningEndpoint
    SignerEndpoint groups common functionality (read/write/timeouts/etc.)

Signing (client/server): The following classes take care of exchanging request/responses

    SignerClient
    SignerServer

This PR also closes #3601

Commits:

* refactoring - work in progress

* reworking unit tests

* Encapsulating and fixing unit tests

* Improve tests

* Clean up

* Fix/improve unit tests

* clean up tests

* Improving service endpoint

* fixing unit test

* fix linter issues

* avoid invalid cache values (improve later?)

* complete implementation

* wip

* improved connection loop

* Improve reconnections + fixing unit tests

* addressing comments

* small formatting changes

* clean up

* Update node/node.go

Co-Authored-By: jleni <juan.leni@zondax.ch>

* Update privval/signer_client.go

Co-Authored-By: jleni <juan.leni@zondax.ch>

* Update privval/signer_client_test.go

Co-Authored-By: jleni <juan.leni@zondax.ch>

* check during initialization

* dropping connecting when writing fails

* removing break

* use t.log instead

* unifying and using cmn.GetFreePort()

* review fixes

* reordering and unifying drop connection

* closing instead of signalling

* refactored service loop

* removed superfluous brackets

* GetPubKey can return errors

* Revert "GetPubKey can return errors"

This reverts commit 68c06f19b4650389d7e5ab1659b318889028202c.

* adding entry to changelog

* Update CHANGELOG_PENDING.md

Co-Authored-By: jleni <juan.leni@zondax.ch>

* Update privval/signer_client.go

Co-Authored-By: jleni <juan.leni@zondax.ch>

* Update privval/signer_dialer_endpoint.go

Co-Authored-By: jleni <juan.leni@zondax.ch>

* Update privval/signer_dialer_endpoint.go

Co-Authored-By: jleni <juan.leni@zondax.ch>

* Update privval/signer_dialer_endpoint.go

Co-Authored-By: jleni <juan.leni@zondax.ch>

* Update privval/signer_dialer_endpoint.go

Co-Authored-By: jleni <juan.leni@zondax.ch>

* Update privval/signer_listener_endpoint_test.go

Co-Authored-By: jleni <juan.leni@zondax.ch>

* updating node.go

* review fixes

* fixes linter

* fixing unit test

* small fixes in comments

* addressing review comments

* addressing review comments 2

* reverting suggestion

* Update privval/signer_client_test.go

Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com>

* Update privval/signer_client_test.go

Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com>

* Update privval/signer_listener_endpoint_test.go

Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com>

* do not expose brokenSignerDialerEndpoint

* clean up logging

* unifying methods

shorten test time
signer also drops

* reenabling pings

* improving testability + unit test

* fixing go fmt + unit test

* remove unused code

* Addressing review comments

* simplifying connection workflow

* fix linter/go import issue

* using base service quit

* updating comment

* Simplifying design + adjusting names

* fixing linter issues

* refactoring test harness + fixes

* Addressing review comments

* cleaning up

* adding additional error check
2019-08-05 19:09:10 +04:00
Anton Kaliaev
4da3de79a7 consensus: reduce "Error attempting to add vote" message severity (Er… (#3871)
* consensus: reduce "Error attempting to add vote" message severity (Error -> Info)

Fixes #3839

* add missing changelog entry
2019-08-03 00:00:18 +04:00
Anton Kaliaev
f5b116c687 config: move max_msg_bytes into mempool section (#3869)
* config: move max_msg_bytes into mempool section

It was incorrectly placed in fastsync section during
4d7cd8055b (diff-092cdc48047eeb4c0bca311a2e1b8ae6)
merge.

Fixes #3868

* add changelog entry
2019-08-02 23:42:17 +04:00
Anton Kaliaev
be06316c84 Merge pull request #3870 from tendermint/release/v0.32.2
Merge v0.32.2 release back to master
2019-08-02 23:30:37 +04:00
Jack Zampolin
8ba8497ac8 Merge branch 'master' into release/v0.32.2 2019-08-02 08:52:21 -07:00
Marko
23fa2e1f1b Merge PR #3860: Update log v0.32.2
* changelog updates

* pr comments
2019-08-01 09:33:27 -07:00
Ethan Buchman
ba9cdeaed9 Merge branch 'v0.32' into release/v0.32.2 2019-07-31 17:26:40 -04:00
Marko Baricevic
8ed1400949 release for v0.32.2 2019-07-31 17:30:11 +02:00
Ethan Buchman
dbf4062acd Merge pull request #3807 from tendermint/release/v0.32.1
Release/v0.32.1
2019-07-15 12:25:35 -04:00
Ethan Buchman
79e924b34f Merge branch 'master' into release/v0.32.1 2019-07-15 11:42:36 -04:00
Marko
0c9a284f8d Marko/update release1 (#3806)
* pr comments

* remove empty space
2019-07-15 11:42:08 -04:00
Marko
245e1c9ef7 pr comments (#3803)
* pr comments

* abci changelog change
2019-07-15 11:31:50 -04:00
Marko Baricevic
470f1efcc8 merge master 2019-07-15 11:34:01 +02:00
Marko Baricevic
4e3b6bfb26 Merge remote-tracking branch 'origin' into marko/v0.32.1 2019-07-13 13:40:04 +02:00
Marko Baricevic
7a86e49312 moved abci change to features as it doesnt break the abci 2019-07-13 13:39:11 +02:00
Marko Baricevic
744d65f173 Merge branch 'master' into marko/v0.32.1 2019-07-12 19:30:11 +02:00
Marko Baricevic
1b69c6b56b include doc change for abci/app 2019-07-12 15:59:53 +02:00
Marko Baricevic
e5084a4787 Merge branch 'master' into marko/v0.32.1 2019-07-12 15:51:14 +02:00
Marko Baricevic
0787b79347 add link bump abci Version 2019-07-12 11:17:20 +02:00
Marko
823d916a11 Apply suggestions from code review
Comment from PR

Co-Authored-By: Ethan Buchman <ethan@coinculture.info>
2019-07-12 11:06:27 +02:00
Marko Baricevic
e8926867d8 wording change 2019-07-11 21:22:14 +02:00
Marko Baricevic
0f076e5fbe add links to changelog 2019-07-11 18:37:46 +02:00
Marko Baricevic
ac232caef3 Release branch for v0.32.1
Signed-off-by: Marko Baricevic <marbar3778@yahoo.com>
2019-07-11 18:34:21 +02:00
38 changed files with 2091 additions and 1288 deletions

2
.gitignore vendored
View File

@@ -43,3 +43,5 @@ terraform.tfstate.backup
terraform.tfstate.d
.vscode
profile\.out

View File

@@ -1,10 +1,48 @@
# Changelog
## v0.32.2
*July 31, 2019*
Special thanks to external contributors on this release:
@ruseinov, @bluele, @guagualvcha
Friendly reminder, we have a [bug bounty
program](https://hackerone.com/tendermint).
### BREAKING CHANGES:
- Go API
- [libs] [\#3811](https://github.com/tendermint/tendermint/issues/3811) Remove `db` from libs in favor of `https://github.com/tendermint/tm-db`
### FEATURES:
- [node] [\#3846](https://github.com/tendermint/tendermint/pull/3846) Allow replacing existing p2p.Reactor(s) using [`CustomReactors`
option](https://godoc.org/github.com/tendermint/tendermint/node#CustomReactors).
Warning: beware of accidental name clashes. Here is the list of existing
reactors: MEMPOOL, BLOCKCHAIN, CONSENSUS, EVIDENCE, PEX.
- [p2p] [\#3834](https://github.com/tendermint/tendermint/issues/3834) Do not write 'Couldn't connect to any seeds' error log if there are no seeds in config file
- [rpc] [\#3818](https://github.com/tendermint/tendermint/issues/3818) Make `max_body_bytes` and `max_header_bytes` configurable(@bluele)
- [mempool] [\#3826](https://github.com/tendermint/tendermint/issues/3826) Make `max_msg_bytes` configurable(@bluele)
- [blockchain] [\#3561](https://github.com/tendermint/tendermint/issues/3561) Add early version of the new blockchain reactor, which is supposed to be more modular and testable compared to the old version. To try it, you'll have to change `version` in the config file, [here](https://github.com/tendermint/tendermint/blob/master/config/toml.go#L303) NOTE: It's not ready for a production yet. For further information, see [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md) & [ADR-43](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-043-blockchain-riri-org.md)
### IMPROVEMENTS:
- [abci] [\#3809](https://github.com/tendermint/tendermint/issues/3809) Recover from application panics in `server/socket_server.go` to allow socket cleanup (@ruseinov)
- [rpc] [\#2252](https://github.com/tendermint/tendermint/issues/2252) Add `/broadcast_evidence` endpoint to submit double signing and other types of evidence
- [p2p] [\#3664](https://github.com/tendermint/tendermint/issues/3664) p2p/conn: reuse buffer when write/read from secret connection(@guagualvcha)
- [rpc] [\#3076](https://github.com/tendermint/tendermint/issues/3076) Improve transaction search performance
### BUG FIXES:
- [p2p] [\#3644](https://github.com/tendermint/tendermint/issues/3644) Fix error logging for connection stop (@defunctzombie)
- [rpc] [\#3813](https://github.com/tendermint/tendermint/issues/3813) Return err if page is incorrect (less than 0 or greater than total pages)
## v0.32.1
*July 15, 2019*
Special thanks to external contributors on this release:
Special thanks to external contributors on this release:
@ParthDesai, @climber73, @jim380, @ashleyvega
This release contains a minor enhancement to the ABCI and some breaking changes to our libs folder, namely:
@@ -26,7 +64,7 @@ program](https://hackerone.com/tendermint).
### FEATURES:
- [node] Add variadic argument to `NewNode` to support functional options, allowing the Node to be more easily customized.
- [node] Add variadic argument to `NewNode` to support functional options, allowing the Node to be more easily customized.
- [node][\#3730](https://github.com/tendermint/tendermint/pull/3730) Add `CustomReactors` option to `NewNode` allowing caller to pass
custom reactors to run inside Tendermint node (@ParthDesai)
- [abci] [\#2127](https://github.com/tendermint/tendermint/issues/2127)RequestCheckTx has a new field, `CheckTxType`, which can take values of `CheckTxType_New` and `CheckTxType_Recheck`, indicating whether this is a new tx being checked for the first time or whether this tx is being rechecked after a block commit. This allows applications to skip certain expensive operations, like signature checking, if they've already been done once. see [docs](https://github.com/tendermint/tendermint/blob/eddb433d7c082efbeaf8974413a36641519ee895/docs/spec/abci/apps.md#mempool-connection)

View File

@@ -1,4 +1,4 @@
## v0.32.2
## v0.32.3
\*\*
@@ -14,27 +14,14 @@ program](https://hackerone.com/tendermint).
- Apps
- Go API
- [libs] \#3811 Remove `db` from libs in favor of `https://github.com/tendermint/tm-db`
### FEATURES:
- [node] Allow replacing existing p2p.Reactor(s) using [`CustomReactors`
option](https://godoc.org/github.com/tendermint/tendermint/node#CustomReactors).
Warning: beware of accidental name clashes. Here is the list of existing
reactors: MEMPOOL, BLOCKCHAIN, CONSENSUS, EVIDENCE, PEX.
### IMPROVEMENTS:
- [p2p] \#3834 Do not write 'Couldn't connect to any seeds' error log if there are no seeds in config file
- [abci] \#3809 Recover from application panics in `server/socket_server.go` to allow socket cleanup (@ruseinov)
- [rpc] \#2252 Add `/broadcast_evidence` endpoint to submit double signing and other types of evidence
- [rpc] \#3818 Make `max_body_bytes` and `max_header_bytes` configurable
- [p2p] \#3664 p2p/conn: reuse buffer when write/read from secret connection
- [mempool] \#3826 Make `max_msg_bytes` configurable
- [blockchain] \#3561 Add early version of the new blockchain reactor, which is supposed to be more modular and testable compared to the old version. To try it, you'll have to change `version` in the config file, [here](https://github.com/tendermint/tendermint/blob/master/config/toml.go#L303) NOTE: It's not ready for a production yet. For further information, see [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md) & [ADR-43](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-043-blockchain-riri-org.md)
- [rpc] \#3076 Improve transaction search performance
- [consensus] \#3839 Reduce "Error attempting to add vote" message severity (Error -> Info)
- [privval] \#3370 Refactors and simplifies validator/kms connection handling. Please refer to [this comment](https://github.com/tendermint/tendermint/pull/3370#issue-257360971) for details.
### BUG FIXES:
- [p2p][\#3644](https://github.com/tendermint/tendermint/pull/3644) Fix error logging for connection stop (@defunctzombie)
- [rpc] \#3813 Return err if page is incorrect (less than 0 or greater than total pages)
- [config] \#3868 move misplaced `max_msg_bytes` into mempool section

387
blockchain/v2/schedule.go Normal file
View File

@@ -0,0 +1,387 @@
// nolint:unused
package v2
import (
"fmt"
"math"
"math/rand"
"time"
"github.com/tendermint/tendermint/p2p"
)
type Event interface{}
type blockState int
const (
blockStateUnknown blockState = iota
blockStateNew
blockStatePending
blockStateReceived
blockStateProcessed
)
func (e blockState) String() string {
switch e {
case blockStateUnknown:
return "Unknown"
case blockStateNew:
return "New"
case blockStatePending:
return "Pending"
case blockStateReceived:
return "Received"
case blockStateProcessed:
return "Processed"
default:
return fmt.Sprintf("unknown blockState: %d", e)
}
}
type peerState int
const (
peerStateNew = iota
peerStateReady
peerStateRemoved
)
func (e peerState) String() string {
switch e {
case peerStateNew:
return "New"
case peerStateReady:
return "Ready"
case peerStateRemoved:
return "Removed"
default:
return fmt.Sprintf("unknown peerState: %d", e)
}
}
type scPeer struct {
peerID p2p.ID
state peerState
height int64
lastTouched time.Time
lastRate int64
}
func newScPeer(peerID p2p.ID) *scPeer {
return &scPeer{
peerID: peerID,
state: peerStateNew,
height: -1,
lastTouched: time.Time{},
}
}
// The schedule is a composite data structure which allows a scheduler to keep
// track of which blocks have been scheduled into which state.
type schedule struct {
initHeight int64
// a list of blocks in which blockState
blockStates map[int64]blockState
// a map of peerID to schedule specific peer struct `scPeer` used to keep
// track of peer specific state
peers map[p2p.ID]*scPeer
// a map of heights to the peer we are waiting for a response from
pendingBlocks map[int64]p2p.ID
// the time at which a block was put in blockStatePending
pendingTime map[int64]time.Time
// the peerID of the peer which put the block in blockStateReceived
receivedBlocks map[int64]p2p.ID
}
func newSchedule(initHeight int64) *schedule {
sc := schedule{
initHeight: initHeight,
blockStates: make(map[int64]blockState),
peers: make(map[p2p.ID]*scPeer),
pendingBlocks: make(map[int64]p2p.ID),
pendingTime: make(map[int64]time.Time),
receivedBlocks: make(map[int64]p2p.ID),
}
sc.setStateAtHeight(initHeight, blockStateNew)
return &sc
}
func (sc *schedule) addPeer(peerID p2p.ID) error {
if _, ok := sc.peers[peerID]; ok {
return fmt.Errorf("Cannot add duplicate peer %s", peerID)
}
sc.peers[peerID] = newScPeer(peerID)
return nil
}
func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error {
peer, ok := sc.peers[peerID]
if !ok {
return fmt.Errorf("Couldn't find peer %s", peerID)
}
if peer.state == peerStateRemoved {
return fmt.Errorf("Tried to touch peer in peerStateRemoved")
}
peer.lastTouched = time
return nil
}
func (sc *schedule) removePeer(peerID p2p.ID) error {
peer, ok := sc.peers[peerID]
if !ok {
return fmt.Errorf("Couldn't find peer %s", peerID)
}
if peer.state == peerStateRemoved {
return fmt.Errorf("Tried to remove peer %s in peerStateRemoved", peerID)
}
for height, pendingPeerID := range sc.pendingBlocks {
if pendingPeerID == peerID {
sc.setStateAtHeight(height, blockStateNew)
delete(sc.pendingTime, height)
delete(sc.pendingBlocks, height)
}
}
for height, rcvPeerID := range sc.receivedBlocks {
if rcvPeerID == peerID {
sc.setStateAtHeight(height, blockStateNew)
delete(sc.receivedBlocks, height)
}
}
peer.state = peerStateRemoved
return nil
}
func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error {
peer, ok := sc.peers[peerID]
if !ok {
return fmt.Errorf("Can't find peer %s", peerID)
}
if peer.state == peerStateRemoved {
return fmt.Errorf("Cannot set peer height for a peer in peerStateRemoved")
}
if height < peer.height {
return fmt.Errorf("Cannot move peer height lower. from %d to %d", peer.height, height)
}
peer.height = height
peer.state = peerStateReady
for i := sc.minHeight(); i <= height; i++ {
if sc.getStateAtHeight(i) == blockStateUnknown {
sc.setStateAtHeight(i, blockStateNew)
}
}
return nil
}
func (sc *schedule) getStateAtHeight(height int64) blockState {
if height < sc.initHeight {
return blockStateProcessed
} else if state, ok := sc.blockStates[height]; ok {
return state
} else {
return blockStateUnknown
}
}
func (sc *schedule) getPeersAtHeight(height int64) []*scPeer {
peers := []*scPeer{}
for _, peer := range sc.peers {
if peer.height >= height {
peers = append(peers, peer)
}
}
return peers
}
func (sc *schedule) peersInactiveSince(duration time.Duration, now time.Time) []p2p.ID {
peers := []p2p.ID{}
for _, peer := range sc.peers {
if now.Sub(peer.lastTouched) > duration {
peers = append(peers, peer.peerID)
}
}
return peers
}
func (sc *schedule) peersSlowerThan(minSpeed int64) []p2p.ID {
peers := []p2p.ID{}
for _, peer := range sc.peers {
if peer.lastRate < minSpeed {
peers = append(peers, peer.peerID)
}
}
return peers
}
func (sc *schedule) setStateAtHeight(height int64, state blockState) {
sc.blockStates[height] = state
}
func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error {
peer, ok := sc.peers[peerID]
if !ok {
return fmt.Errorf("Can't find peer %s", peerID)
}
if peer.state == peerStateRemoved {
return fmt.Errorf("Cannot receive blocks from removed peer %s", peerID)
}
if state := sc.getStateAtHeight(height); state != blockStatePending || sc.pendingBlocks[height] != peerID {
return fmt.Errorf("Received block %d from peer %s without being requested", height, peerID)
}
pendingTime, ok := sc.pendingTime[height]
if !ok || now.Sub(pendingTime) <= 0 {
return fmt.Errorf("Clock error. Block %d received at %s but requested at %s",
height, pendingTime, now)
}
peer.lastRate = size / int64(now.Sub(pendingTime).Seconds())
sc.setStateAtHeight(height, blockStateReceived)
delete(sc.pendingBlocks, height)
delete(sc.pendingTime, height)
sc.receivedBlocks[height] = peerID
return nil
}
func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) error {
peer, ok := sc.peers[peerID]
if !ok {
return fmt.Errorf("Can't find peer %s", peerID)
}
state := sc.getStateAtHeight(height)
if state != blockStateNew {
return fmt.Errorf("Block %d should be in blockStateNew but was %s", height, state)
}
if peer.state != peerStateReady {
return fmt.Errorf("Cannot schedule %d from %s in %s", height, peerID, peer.state)
}
if height > peer.height {
return fmt.Errorf("Cannot request height %d from peer %s who is at height %d",
height, peerID, peer.height)
}
sc.setStateAtHeight(height, blockStatePending)
sc.pendingBlocks[height] = peerID
// XXX: to make this more accurate we can introduce a message from
// the IO routine which indicates the time the request was put on the wire
sc.pendingTime[height] = time
return nil
}
func (sc *schedule) markProcessed(height int64) error {
state := sc.getStateAtHeight(height)
if state != blockStateReceived {
return fmt.Errorf("Can't mark height %d received from block state %s", height, state)
}
delete(sc.receivedBlocks, height)
sc.setStateAtHeight(height, blockStateProcessed)
return nil
}
// allBlockProcessed returns true if all blocks are in blockStateProcessed and
// determines if the schedule has been completed
func (sc *schedule) allBlocksProcessed() bool {
for _, state := range sc.blockStates {
if state != blockStateProcessed {
return false
}
}
return true
}
// highest block | state == blockStateNew
func (sc *schedule) maxHeight() int64 {
var max int64 = 0
for height, state := range sc.blockStates {
if state == blockStateNew && height > max {
max = height
}
}
return max
}
// lowest block | state == blockStateNew
func (sc *schedule) minHeight() int64 {
var min int64 = math.MaxInt64
for height, state := range sc.blockStates {
if state == blockStateNew && height < min {
min = height
}
}
return min
}
func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 {
heights := []int64{}
for height, pendingPeerID := range sc.pendingBlocks {
if pendingPeerID == peerID {
heights = append(heights, height)
}
}
return heights
}
func (sc *schedule) selectPeer(peers []*scPeer) *scPeer {
// FIXME: properPeerSelector
s := rand.NewSource(time.Now().Unix())
r := rand.New(s)
return peers[r.Intn(len(peers))]
}
// XXX: this duplicates the logic of peersInactiveSince and peersSlowerThan
func (sc *schedule) prunablePeers(peerTimout time.Duration, minRecvRate int64, now time.Time) []p2p.ID {
prunable := []p2p.ID{}
for peerID, peer := range sc.peers {
if now.Sub(peer.lastTouched) > peerTimout || peer.lastRate < minRecvRate {
prunable = append(prunable, peerID)
}
}
return prunable
}
func (sc *schedule) numBlockInState(targetState blockState) uint32 {
var num uint32 = 0
for _, state := range sc.blockStates {
if state == targetState {
num++
}
}
return num
}

View File

@@ -0,0 +1,272 @@
package v2
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/p2p"
)
func TestScheduleInit(t *testing.T) {
var (
initHeight int64 = 5
sc = newSchedule(initHeight)
)
assert.Equal(t, blockStateNew, sc.getStateAtHeight(initHeight))
assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(initHeight-1))
assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(initHeight+1))
}
func TestAddPeer(t *testing.T) {
var (
initHeight int64 = 5
peerID p2p.ID = "1"
peerIDTwo p2p.ID = "2"
sc = newSchedule(initHeight)
)
assert.Nil(t, sc.addPeer(peerID))
assert.Nil(t, sc.addPeer(peerIDTwo))
assert.Error(t, sc.addPeer(peerID))
}
func TestTouchPeer(t *testing.T) {
var (
initHeight int64 = 5
peerID p2p.ID = "1"
sc = newSchedule(initHeight)
now = time.Now()
)
assert.Error(t, sc.touchPeer(peerID, now),
"Touching an unknown peer should return errPeerNotFound")
assert.Nil(t, sc.addPeer(peerID),
"Adding a peer should return no error")
assert.Nil(t, sc.touchPeer(peerID, now),
"Touching a peer should return no error")
threshold := 10 * time.Second
assert.Empty(t, sc.peersInactiveSince(threshold, now.Add(9*time.Second)),
"Expected no peers to have been touched over 9 seconds")
assert.Containsf(t, sc.peersInactiveSince(threshold, now.Add(11*time.Second)), peerID,
"Expected one %s to have been touched over 10 seconds ago", peerID)
}
func TestPeerHeight(t *testing.T) {
var (
initHeight int64 = 5
peerID p2p.ID = "1"
peerHeight int64 = 20
sc = newSchedule(initHeight)
)
assert.NoError(t, sc.addPeer(peerID),
"Adding a peer should return no error")
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight))
for i := initHeight; i <= peerHeight; i++ {
assert.Equal(t, sc.getStateAtHeight(i), blockStateNew,
"Expected all blocks to be in blockStateNew")
peerIDs := []p2p.ID{}
for _, peer := range sc.getPeersAtHeight(i) {
peerIDs = append(peerIDs, peer.peerID)
}
assert.Containsf(t, peerIDs, peerID,
"Expected %s to have block %d", peerID, i)
}
}
func TestTransitionPending(t *testing.T) {
var (
initHeight int64 = 5
peerID p2p.ID = "1"
peerIDTwo p2p.ID = "2"
peerHeight int64 = 20
sc = newSchedule(initHeight)
now = time.Now()
)
assert.NoError(t, sc.addPeer(peerID),
"Adding a peer should return no error")
assert.Nil(t, sc.addPeer(peerIDTwo),
"Adding a peer should return no error")
assert.Error(t, sc.markPending(peerID, peerHeight, now),
"Expected scheduling a block from a peer in peerStateNew to fail")
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight),
"Expected setPeerHeight to return no error")
assert.NoError(t, sc.setPeerHeight(peerIDTwo, peerHeight),
"Expected setPeerHeight to return no error")
assert.NoError(t, sc.markPending(peerID, peerHeight, now),
"Expected markingPending new block to succeed")
assert.Error(t, sc.markPending(peerIDTwo, peerHeight, now),
"Expected markingPending by a second peer to fail")
assert.Equal(t, blockStatePending, sc.getStateAtHeight(peerHeight),
"Expected the block to to be in blockStatePending")
assert.NoError(t, sc.removePeer(peerID),
"Expected removePeer to return no error")
assert.Equal(t, blockStateNew, sc.getStateAtHeight(peerHeight),
"Expected the block to to be in blockStateNew")
assert.Error(t, sc.markPending(peerID, peerHeight, now),
"Expected markingPending removed peer to fail")
assert.NoError(t, sc.markPending(peerIDTwo, peerHeight, now),
"Expected markingPending on a ready peer to succeed")
assert.Equal(t, blockStatePending, sc.getStateAtHeight(peerHeight),
"Expected the block to to be in blockStatePending")
}
func TestTransitionReceived(t *testing.T) {
var (
initHeight int64 = 5
peerID p2p.ID = "1"
peerIDTwo p2p.ID = "2"
peerHeight int64 = 20
blockSize int64 = 1024
sc = newSchedule(initHeight)
now = time.Now()
receivedAt = now.Add(1 * time.Second)
)
assert.NoError(t, sc.addPeer(peerID),
"Expected adding peer %s to succeed", peerID)
assert.NoError(t, sc.addPeer(peerIDTwo),
"Expected adding peer %s to succeed", peerIDTwo)
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight),
"Expected setPeerHeight to return no error")
assert.NoErrorf(t, sc.setPeerHeight(peerIDTwo, peerHeight),
"Expected setPeerHeight on %s to %d to succeed", peerIDTwo, peerHeight)
assert.NoError(t, sc.markPending(peerID, initHeight, now),
"Expected markingPending new block to succeed")
assert.Error(t, sc.markReceived(peerIDTwo, initHeight, blockSize, receivedAt),
"Expected marking markReceived from a non requesting peer to fail")
assert.NoError(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt),
"Expected marking markReceived on a pending block to succeed")
assert.Error(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt),
"Expected marking markReceived on received block to fail")
assert.Equalf(t, blockStateReceived, sc.getStateAtHeight(initHeight),
"Expected block %d to be blockHeightReceived", initHeight)
assert.NoErrorf(t, sc.removePeer(peerID),
"Expected removePeer removing %s to succeed", peerID)
assert.Equalf(t, blockStateNew, sc.getStateAtHeight(initHeight),
"Expected block %d to be blockStateNew", initHeight)
assert.NoErrorf(t, sc.markPending(peerIDTwo, initHeight, now),
"Expected markingPending %d from %s to succeed", initHeight, peerIDTwo)
assert.NoErrorf(t, sc.markReceived(peerIDTwo, initHeight, blockSize, receivedAt),
"Expected marking markReceived %d from %s to succeed", initHeight, peerIDTwo)
assert.Equalf(t, blockStateReceived, sc.getStateAtHeight(initHeight),
"Expected block %d to be blockStateReceived", initHeight)
}
func TestTransitionProcessed(t *testing.T) {
var (
initHeight int64 = 5
peerID p2p.ID = "1"
peerHeight int64 = 20
blockSize int64 = 1024
sc = newSchedule(initHeight)
now = time.Now()
receivedAt = now.Add(1 * time.Second)
)
assert.NoError(t, sc.addPeer(peerID),
"Expected adding peer %s to succeed", peerID)
assert.NoErrorf(t, sc.setPeerHeight(peerID, peerHeight),
"Expected setPeerHeight on %s to %d to succeed", peerID, peerHeight)
assert.NoError(t, sc.markPending(peerID, initHeight, now),
"Expected markingPending new block to succeed")
assert.NoError(t, sc.markReceived(peerID, initHeight, blockSize, receivedAt),
"Expected marking markReceived on a pending block to succeed")
assert.Error(t, sc.markProcessed(initHeight+1),
"Expected marking %d as processed to fail", initHeight+1)
assert.NoError(t, sc.markProcessed(initHeight),
"Expected marking %d as processed to succeed", initHeight)
assert.Equalf(t, blockStateProcessed, sc.getStateAtHeight(initHeight),
"Expected block %d to be blockStateProcessed", initHeight)
assert.NoError(t, sc.removePeer(peerID),
"Expected removing peer %s to succeed", peerID)
assert.Equalf(t, blockStateProcessed, sc.getStateAtHeight(initHeight),
"Expected block %d to be blockStateProcessed", initHeight)
}
func TestMinMaxHeight(t *testing.T) {
var (
initHeight int64 = 5
peerID p2p.ID = "1"
peerHeight int64 = 20
sc = newSchedule(initHeight)
now = time.Now()
)
assert.Equal(t, initHeight, sc.minHeight(),
"Expected min height to be the initialized height")
assert.Equal(t, initHeight, sc.maxHeight(),
"Expected max height to be the initialized height")
assert.NoError(t, sc.addPeer(peerID),
"Adding a peer should return no error")
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight),
"Expected setPeerHeight to return no error")
assert.Equal(t, peerHeight, sc.maxHeight(),
"Expected max height to increase to peerHeight")
assert.Nil(t, sc.markPending(peerID, initHeight, now.Add(1*time.Second)),
"Expected marking initHeight as pending to return no error")
assert.Equal(t, initHeight+1, sc.minHeight(),
"Expected marking initHeight as pending to move minHeight forward")
}
func TestPeersSlowerThan(t *testing.T) {
var (
initHeight int64 = 5
peerID p2p.ID = "1"
peerHeight int64 = 20
blockSize int64 = 1024
sc = newSchedule(initHeight)
now = time.Now()
receivedAt = now.Add(1 * time.Second)
)
assert.NoError(t, sc.addPeer(peerID),
"Adding a peer should return no error")
assert.NoError(t, sc.setPeerHeight(peerID, peerHeight),
"Expected setPeerHeight to return no error")
assert.NoError(t, sc.markPending(peerID, peerHeight, now),
"Expected markingPending on to return no error")
assert.NoError(t, sc.markReceived(peerID, peerHeight, blockSize, receivedAt),
"Expected markingPending on to return no error")
assert.Empty(t, sc.peersSlowerThan(blockSize-1),
"expected no peers to be slower than blockSize-1 bytes/sec")
assert.Containsf(t, sc.peersSlowerThan(blockSize+1), peerID,
"expected %s to be slower than blockSize+1 bytes/sec", peerID)
}

View File

@@ -48,15 +48,17 @@ func main() {
os.Exit(1)
}
rs := privval.NewSignerServiceEndpoint(logger, *chainID, pv, dialer)
err := rs.Start()
sd := privval.NewSignerDialerEndpoint(logger, dialer)
ss := privval.NewSignerServer(sd, *chainID, pv)
err := ss.Start()
if err != nil {
panic(err)
}
// Stop upon receiving SIGTERM or CTRL-C.
cmn.TrapSignal(logger, func() {
err := rs.Stop()
err := ss.Stop()
if err != nil {
panic(err)
}

View File

@@ -294,6 +294,9 @@ max_txs_bytes = {{ .Mempool.MaxTxsBytes }}
# Size of the cache (used to filter transactions we saw earlier) in transactions
cache_size = {{ .Mempool.CacheSize }}
# Limit the size of TxMessage
max_msg_bytes = {{ .Mempool.MaxMsgBytes }}
##### fast sync configuration options #####
[fastsync]
@@ -302,9 +305,6 @@ cache_size = {{ .Mempool.CacheSize }}
# 2) "v1" - refactor of v0 version for better testability
version = "{{ .FastSync.Version }}"
# Limit the size of TxMessage
max_msg_bytes = {{ .Mempool.MaxMsgBytes }}
##### consensus configuration options #####
[consensus]

View File

@@ -1518,9 +1518,11 @@ func (cs *ConsensusState) tryAddVote(vote *types.Vote, peerID p2p.ID) (bool, err
cs.evpool.AddEvidence(voteErr.DuplicateVoteEvidence)
return added, err
} else {
// Probably an invalid signature / Bad peer.
// Seems this can also err sometimes with "Unexpected step" - perhaps not from a bad peer ?
cs.Logger.Error("Error attempting to add vote", "err", err)
// Either
// 1) bad peer OR
// 2) not a bad peer? this can also err sometimes with "Unexpected step" OR
// 3) tmkms use with multiple validators connecting to a single tmkms instance (https://github.com/tendermint/tendermint/issues/3839).
cs.Logger.Info("Error attempting to add vote", "err", err)
return added, ErrAddingVote
}
}

View File

@@ -240,6 +240,9 @@ max_txs_bytes = 1073741824
# Size of the cache (used to filter transactions we saw earlier) in transactions
cache_size = 10000
# Limit the size of TxMessage
max_msg_bytes = 1048576
##### fast sync configuration options #####
[fastsync]
@@ -248,9 +251,6 @@ cache_size = 10000
# 2) "v1" - refactor of v0 version for better testability
version = "v0"
# Limit the size of TxMessage
max_msg_bytes = 1048576
##### consensus configuration options #####
[consensus]

View File

@@ -23,7 +23,7 @@ import (
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/consensus"
cs "github.com/tendermint/tendermint/consensus"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/evidence"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
@@ -278,9 +278,7 @@ func doHandshake(stateDB dbm.DB, state sm.State, blockStore sm.BlockStore,
return nil
}
func logNodeStartupInfo(state sm.State, privValidator types.PrivValidator, logger,
consensusLogger log.Logger) {
func logNodeStartupInfo(state sm.State, pubKey crypto.PubKey, logger, consensusLogger log.Logger) {
// Log the version info.
logger.Info("Version info",
"software", version.TMCoreSemVer,
@@ -296,7 +294,6 @@ func logNodeStartupInfo(state sm.State, privValidator types.PrivValidator, logge
)
}
pubKey := privValidator.GetPubKey()
addr := pubKey.Address()
// Log whether this node is a validator or an observer
if state.Validators.HasAddress(addr) {
@@ -601,7 +598,13 @@ func NewNode(config *cfg.Config,
}
}
logNodeStartupInfo(state, privValidator, logger, consensusLogger)
pubKey := privValidator.GetPubKey()
if pubKey == nil {
// TODO: GetPubKey should return errors - https://github.com/tendermint/tendermint/issues/3602
return nil, errors.New("could not retrieve public key from private validator")
}
logNodeStartupInfo(state, pubKey, logger, consensusLogger)
// Decide whether to fast-sync or not
// We don't fast-sync when the only validator is us.
@@ -1158,29 +1161,13 @@ func createAndStartPrivValidatorSocketClient(
listenAddr string,
logger log.Logger,
) (types.PrivValidator, error) {
var listener net.Listener
protocol, address := cmn.ProtocolAndAddress(listenAddr)
ln, err := net.Listen(protocol, address)
pve, err := privval.NewSignerListener(listenAddr, logger)
if err != nil {
return nil, err
}
switch protocol {
case "unix":
listener = privval.NewUnixListener(ln)
case "tcp":
// TODO: persist this key so external signer
// can actually authenticate us
listener = privval.NewTCPListener(ln, ed25519.GenPrivKey())
default:
return nil, fmt.Errorf(
"wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
protocol,
)
return nil, errors.Wrap(err, "failed to start private validator")
}
pvsc := privval.NewSignerValidatorEndpoint(logger.With("module", "privval"), listener)
if err := pvsc.Start(); err != nil {
pvsc, err := privval.NewSignerClient(pve)
if err != nil {
return nil, errors.Wrap(err, "failed to start private validator")
}

View File

@@ -136,25 +136,29 @@ func TestNodeSetPrivValTCP(t *testing.T) {
config.BaseConfig.PrivValidatorListenAddr = addr
dialer := privval.DialTCPFn(addr, 100*time.Millisecond, ed25519.GenPrivKey())
pvsc := privval.NewSignerServiceEndpoint(
dialerEndpoint := privval.NewSignerDialerEndpoint(
log.TestingLogger(),
config.ChainID(),
types.NewMockPV(),
dialer,
)
privval.SignerServiceEndpointTimeoutReadWrite(100 * time.Millisecond)(pvsc)
privval.SignerDialerEndpointTimeoutReadWrite(100 * time.Millisecond)(dialerEndpoint)
signerServer := privval.NewSignerServer(
dialerEndpoint,
config.ChainID(),
types.NewMockPV(),
)
go func() {
err := pvsc.Start()
err := signerServer.Start()
if err != nil {
panic(err)
}
}()
defer pvsc.Stop()
defer signerServer.Stop()
n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.IsType(t, &privval.SignerValidatorEndpoint{}, n.PrivValidator())
assert.IsType(t, &privval.SignerClient{}, n.PrivValidator())
}
// address without a protocol must result in error
@@ -178,13 +182,17 @@ func TestNodeSetPrivValIPC(t *testing.T) {
config.BaseConfig.PrivValidatorListenAddr = "unix://" + tmpfile
dialer := privval.DialUnixFn(tmpfile)
pvsc := privval.NewSignerServiceEndpoint(
dialerEndpoint := privval.NewSignerDialerEndpoint(
log.TestingLogger(),
config.ChainID(),
types.NewMockPV(),
dialer,
)
privval.SignerServiceEndpointTimeoutReadWrite(100 * time.Millisecond)(pvsc)
privval.SignerDialerEndpointTimeoutReadWrite(100 * time.Millisecond)(dialerEndpoint)
pvsc := privval.NewSignerServer(
dialerEndpoint,
config.ChainID(),
types.NewMockPV(),
)
go func() {
err := pvsc.Start()
@@ -194,8 +202,7 @@ func TestNodeSetPrivValIPC(t *testing.T) {
n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.IsType(t, &privval.SignerValidatorEndpoint{}, n.PrivValidator())
assert.IsType(t, &privval.SignerClient{}, n.PrivValidator())
}
// testFreeAddr claims a free port so we don't block on listener being ready.

View File

@@ -6,16 +6,16 @@ FilePV
FilePV is the simplest implementation and developer default. It uses one file for the private key and another to store state.
SignerValidatorEndpoint
SignerListenerEndpoint
SignerValidatorEndpoint establishes a connection to an external process, like a Key Management Server (KMS), using a socket.
SignerValidatorEndpoint listens for the external KMS process to dial in.
SignerValidatorEndpoint takes a listener, which determines the type of connection
SignerListenerEndpoint establishes a connection to an external process, like a Key Management Server (KMS), using a socket.
SignerListenerEndpoint listens for the external KMS process to dial in.
SignerListenerEndpoint takes a listener, which determines the type of connection
(ie. encrypted over tcp, or unencrypted over unix).
SignerServiceEndpoint
SignerDialerEndpoint
SignerServiceEndpoint is a simple wrapper around a net.Conn. It's used by both IPCVal and TCPVal.
SignerDialerEndpoint is a simple wrapper around a net.Conn. It's used by both IPCVal and TCPVal.
*/
package privval

View File

@@ -4,10 +4,21 @@ import (
"fmt"
)
type EndpointTimeoutError struct{}
// Implement the net.Error interface.
func (e EndpointTimeoutError) Error() string { return "endpoint connection timed out" }
func (e EndpointTimeoutError) Timeout() bool { return true }
func (e EndpointTimeoutError) Temporary() bool { return true }
// Socket errors.
var (
ErrUnexpectedResponse = fmt.Errorf("received unexpected response")
ErrConnTimeout = fmt.Errorf("remote signer timed out")
ErrNoConnection = fmt.Errorf("endpoint is not connected")
ErrConnectionTimeout = EndpointTimeoutError{}
ErrReadTimeout = fmt.Errorf("endpoint read timed out")
ErrWriteTimeout = fmt.Errorf("endpoint write timed out")
)
// RemoteSignerError allows (remote) validators to include meaningful error descriptions in their reply.
@@ -18,5 +29,5 @@ type RemoteSignerError struct {
}
func (e *RemoteSignerError) Error() string {
return fmt.Sprintf("signerServiceEndpoint returned error #%d: %s", e.Code, e.Description)
return fmt.Sprintf("signerEndpoint returned error #%d: %s", e.Code, e.Description)
}

View File

@@ -67,11 +67,11 @@ func assertEqualPV(t *testing.T, oldPV *privval.OldFilePV, newPV *privval.FilePV
}
func initTmpOldFile(t *testing.T) string {
tmpfile, err := ioutil.TempFile("", "priv_validator_*.json")
tmpFile, err := ioutil.TempFile("", "priv_validator_*.json")
require.NoError(t, err)
t.Logf("created test file %s", tmpfile.Name())
_, err = tmpfile.WriteString(oldPrivvalContent)
t.Logf("created test file %s", tmpFile.Name())
_, err = tmpFile.WriteString(oldPrivvalContent)
require.NoError(t, err)
return tmpfile.Name()
return tmpFile.Name()
}

View File

@@ -58,7 +58,7 @@ func TestResetValidator(t *testing.T) {
// priv val after signing is not same as empty
assert.NotEqual(t, privVal.LastSignState, emptyState)
// priv val after reset is same as empty
// priv val after AcceptNewConnection is same as empty
privVal.Reset()
assert.Equal(t, privVal.LastSignState, emptyState)
}
@@ -164,6 +164,7 @@ func TestSignVote(t *testing.T) {
block1 := types.BlockID{Hash: []byte{1, 2, 3}, PartsHeader: types.PartSetHeader{}}
block2 := types.BlockID{Hash: []byte{3, 2, 1}, PartsHeader: types.PartSetHeader{}}
height, round := int64(10), 1
voteType := byte(types.PrevoteType)

View File

@@ -1,61 +1,64 @@
package privval
import (
amino "github.com/tendermint/go-amino"
"github.com/tendermint/go-amino"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/types"
)
// RemoteSignerMsg is sent between SignerServiceEndpoint and the SignerServiceEndpoint client.
type RemoteSignerMsg interface{}
// SignerMessage is sent between Signer Clients and Servers.
type SignerMessage interface{}
func RegisterRemoteSignerMsg(cdc *amino.Codec) {
cdc.RegisterInterface((*RemoteSignerMsg)(nil), nil)
cdc.RegisterInterface((*SignerMessage)(nil), nil)
cdc.RegisterConcrete(&PubKeyRequest{}, "tendermint/remotesigner/PubKeyRequest", nil)
cdc.RegisterConcrete(&PubKeyResponse{}, "tendermint/remotesigner/PubKeyResponse", nil)
cdc.RegisterConcrete(&SignVoteRequest{}, "tendermint/remotesigner/SignVoteRequest", nil)
cdc.RegisterConcrete(&SignedVoteResponse{}, "tendermint/remotesigner/SignedVoteResponse", nil)
cdc.RegisterConcrete(&SignProposalRequest{}, "tendermint/remotesigner/SignProposalRequest", nil)
cdc.RegisterConcrete(&SignedProposalResponse{}, "tendermint/remotesigner/SignedProposalResponse", nil)
cdc.RegisterConcrete(&PingRequest{}, "tendermint/remotesigner/PingRequest", nil)
cdc.RegisterConcrete(&PingResponse{}, "tendermint/remotesigner/PingResponse", nil)
}
// TODO: Add ChainIDRequest
// PubKeyRequest requests the consensus public key from the remote signer.
type PubKeyRequest struct{}
// PubKeyResponse is a PrivValidatorSocket message containing the public key.
// PubKeyResponse is a response message containing the public key.
type PubKeyResponse struct {
PubKey crypto.PubKey
Error *RemoteSignerError
}
// SignVoteRequest is a PrivValidatorSocket message containing a vote.
// SignVoteRequest is a request to sign a vote
type SignVoteRequest struct {
Vote *types.Vote
}
// SignedVoteResponse is a PrivValidatorSocket message containing a signed vote along with a potenial error message.
// SignedVoteResponse is a response containing a signed vote or an error
type SignedVoteResponse struct {
Vote *types.Vote
Error *RemoteSignerError
}
// SignProposalRequest is a PrivValidatorSocket message containing a Proposal.
// SignProposalRequest is a request to sign a proposal
type SignProposalRequest struct {
Proposal *types.Proposal
}
// SignedProposalResponse is a PrivValidatorSocket message containing a proposal response
// SignedProposalResponse is response containing a signed proposal or an error
type SignedProposalResponse struct {
Proposal *types.Proposal
Error *RemoteSignerError
}
// PingRequest is a PrivValidatorSocket message to keep the connection alive.
// PingRequest is a request to confirm that the connection is alive.
type PingRequest struct {
}
// PingRequest is a PrivValidatorSocket response to keep the connection alive.
// PingResponse is a response to confirm that the connection is alive.
type PingResponse struct {
}

131
privval/signer_client.go Normal file
View File

@@ -0,0 +1,131 @@
package privval
import (
"time"
"github.com/pkg/errors"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/types"
)
// SignerClient implements PrivValidator.
// Handles remote validator connections that provide signing services
type SignerClient struct {
endpoint *SignerListenerEndpoint
}
var _ types.PrivValidator = (*SignerClient)(nil)
// NewSignerClient returns an instance of SignerClient.
// it will start the endpoint (if not already started)
func NewSignerClient(endpoint *SignerListenerEndpoint) (*SignerClient, error) {
if !endpoint.IsRunning() {
if err := endpoint.Start(); err != nil {
return nil, errors.Wrap(err, "failed to start listener endpoint")
}
}
return &SignerClient{endpoint: endpoint}, nil
}
// Close closes the underlying connection
func (sc *SignerClient) Close() error {
return sc.endpoint.Close()
}
// IsConnected indicates with the signer is connected to a remote signing service
func (sc *SignerClient) IsConnected() bool {
return sc.endpoint.IsConnected()
}
// WaitForConnection waits maxWait for a connection or returns a timeout error
func (sc *SignerClient) WaitForConnection(maxWait time.Duration) error {
return sc.endpoint.WaitForConnection(maxWait)
}
//--------------------------------------------------------
// Implement PrivValidator
// Ping sends a ping request to the remote signer
func (sc *SignerClient) Ping() error {
response, err := sc.endpoint.SendRequest(&PingRequest{})
if err != nil {
sc.endpoint.Logger.Error("SignerClient::Ping", "err", err)
return nil
}
_, ok := response.(*PingResponse)
if !ok {
sc.endpoint.Logger.Error("SignerClient::Ping", "err", "response != PingResponse")
return err
}
return nil
}
// GetPubKey retrieves a public key from a remote signer
func (sc *SignerClient) GetPubKey() crypto.PubKey {
response, err := sc.endpoint.SendRequest(&PubKeyRequest{})
if err != nil {
sc.endpoint.Logger.Error("SignerClient::GetPubKey", "err", err)
return nil
}
pubKeyResp, ok := response.(*PubKeyResponse)
if !ok {
sc.endpoint.Logger.Error("SignerClient::GetPubKey", "err", "response != PubKeyResponse")
return nil
}
if pubKeyResp.Error != nil {
sc.endpoint.Logger.Error("failed to get private validator's public key", "err", pubKeyResp.Error)
return nil
}
return pubKeyResp.PubKey
}
// SignVote requests a remote signer to sign a vote
func (sc *SignerClient) SignVote(chainID string, vote *types.Vote) error {
response, err := sc.endpoint.SendRequest(&SignVoteRequest{Vote: vote})
if err != nil {
sc.endpoint.Logger.Error("SignerClient::SignVote", "err", err)
return err
}
resp, ok := response.(*SignedVoteResponse)
if !ok {
sc.endpoint.Logger.Error("SignerClient::GetPubKey", "err", "response != SignedVoteResponse")
return ErrUnexpectedResponse
}
if resp.Error != nil {
return resp.Error
}
*vote = *resp.Vote
return nil
}
// SignProposal requests a remote signer to sign a proposal
func (sc *SignerClient) SignProposal(chainID string, proposal *types.Proposal) error {
response, err := sc.endpoint.SendRequest(&SignProposalRequest{Proposal: proposal})
if err != nil {
sc.endpoint.Logger.Error("SignerClient::SignProposal", "err", err)
return err
}
resp, ok := response.(*SignedProposalResponse)
if !ok {
sc.endpoint.Logger.Error("SignerClient::SignProposal", "err", "response != SignedProposalResponse")
return ErrUnexpectedResponse
}
if resp.Error != nil {
return resp.Error
}
*proposal = *resp.Proposal
return nil
}

View File

@@ -0,0 +1,257 @@
package privval
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/types"
)
type signerTestCase struct {
chainID string
mockPV types.PrivValidator
signerClient *SignerClient
signerServer *SignerServer
}
func getSignerTestCases(t *testing.T) []signerTestCase {
testCases := make([]signerTestCase, 0)
// Get test cases for each possible dialer (DialTCP / DialUnix / etc)
for _, dtc := range getDialerTestCases(t) {
chainID := common.RandStr(12)
mockPV := types.NewMockPV()
// get a pair of signer listener, signer dialer endpoints
sl, sd := getMockEndpoints(t, dtc.addr, dtc.dialer)
sc, err := NewSignerClient(sl)
require.NoError(t, err)
ss := NewSignerServer(sd, chainID, mockPV)
err = ss.Start()
require.NoError(t, err)
tc := signerTestCase{
chainID: chainID,
mockPV: mockPV,
signerClient: sc,
signerServer: ss,
}
testCases = append(testCases, tc)
}
return testCases
}
func TestSignerClose(t *testing.T) {
for _, tc := range getSignerTestCases(t) {
err := tc.signerClient.Close()
assert.NoError(t, err)
err = tc.signerServer.Stop()
assert.NoError(t, err)
}
}
func TestSignerPing(t *testing.T) {
for _, tc := range getSignerTestCases(t) {
defer tc.signerServer.Stop()
defer tc.signerClient.Close()
err := tc.signerClient.Ping()
assert.NoError(t, err)
}
}
func TestSignerGetPubKey(t *testing.T) {
for _, tc := range getSignerTestCases(t) {
defer tc.signerServer.Stop()
defer tc.signerClient.Close()
pubKey := tc.signerClient.GetPubKey()
expectedPubKey := tc.mockPV.GetPubKey()
assert.Equal(t, expectedPubKey, pubKey)
addr := tc.signerClient.GetPubKey().Address()
expectedAddr := tc.mockPV.GetPubKey().Address()
assert.Equal(t, expectedAddr, addr)
}
}
func TestSignerProposal(t *testing.T) {
for _, tc := range getSignerTestCases(t) {
ts := time.Now()
want := &types.Proposal{Timestamp: ts}
have := &types.Proposal{Timestamp: ts}
defer tc.signerServer.Stop()
defer tc.signerClient.Close()
require.NoError(t, tc.mockPV.SignProposal(tc.chainID, want))
require.NoError(t, tc.signerClient.SignProposal(tc.chainID, have))
assert.Equal(t, want.Signature, have.Signature)
}
}
func TestSignerVote(t *testing.T) {
for _, tc := range getSignerTestCases(t) {
ts := time.Now()
want := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
have := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
defer tc.signerServer.Stop()
defer tc.signerClient.Close()
require.NoError(t, tc.mockPV.SignVote(tc.chainID, want))
require.NoError(t, tc.signerClient.SignVote(tc.chainID, have))
assert.Equal(t, want.Signature, have.Signature)
}
}
func TestSignerVoteResetDeadline(t *testing.T) {
for _, tc := range getSignerTestCases(t) {
ts := time.Now()
want := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
have := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
defer tc.signerServer.Stop()
defer tc.signerClient.Close()
time.Sleep(testTimeoutReadWrite2o3)
require.NoError(t, tc.mockPV.SignVote(tc.chainID, want))
require.NoError(t, tc.signerClient.SignVote(tc.chainID, have))
assert.Equal(t, want.Signature, have.Signature)
// TODO(jleni): Clarify what is actually being tested
// This would exceed the deadline if it was not extended by the previous message
time.Sleep(testTimeoutReadWrite2o3)
require.NoError(t, tc.mockPV.SignVote(tc.chainID, want))
require.NoError(t, tc.signerClient.SignVote(tc.chainID, have))
assert.Equal(t, want.Signature, have.Signature)
}
}
func TestSignerVoteKeepAlive(t *testing.T) {
for _, tc := range getSignerTestCases(t) {
ts := time.Now()
want := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
have := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
defer tc.signerServer.Stop()
defer tc.signerClient.Close()
// Check that even if the client does not request a
// signature for a long time. The service is still available
// in this particular case, we use the dialer logger to ensure that
// test messages are properly interleaved in the test logs
tc.signerServer.Logger.Debug("TEST: Forced Wait -------------------------------------------------")
time.Sleep(testTimeoutReadWrite * 3)
tc.signerServer.Logger.Debug("TEST: Forced Wait DONE---------------------------------------------")
require.NoError(t, tc.mockPV.SignVote(tc.chainID, want))
require.NoError(t, tc.signerClient.SignVote(tc.chainID, have))
assert.Equal(t, want.Signature, have.Signature)
}
}
func TestSignerSignProposalErrors(t *testing.T) {
for _, tc := range getSignerTestCases(t) {
// Replace service with a mock that always fails
tc.signerServer.privVal = types.NewErroringMockPV()
tc.mockPV = types.NewErroringMockPV()
defer tc.signerServer.Stop()
defer tc.signerClient.Close()
ts := time.Now()
proposal := &types.Proposal{Timestamp: ts}
err := tc.signerClient.SignProposal(tc.chainID, proposal)
require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
err = tc.mockPV.SignProposal(tc.chainID, proposal)
require.Error(t, err)
err = tc.signerClient.SignProposal(tc.chainID, proposal)
require.Error(t, err)
}
}
func TestSignerSignVoteErrors(t *testing.T) {
for _, tc := range getSignerTestCases(t) {
ts := time.Now()
vote := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
// Replace signer service privval with one that always fails
tc.signerServer.privVal = types.NewErroringMockPV()
tc.mockPV = types.NewErroringMockPV()
defer tc.signerServer.Stop()
defer tc.signerClient.Close()
err := tc.signerClient.SignVote(tc.chainID, vote)
require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
err = tc.mockPV.SignVote(tc.chainID, vote)
require.Error(t, err)
err = tc.signerClient.SignVote(tc.chainID, vote)
require.Error(t, err)
}
}
func brokenHandler(privVal types.PrivValidator, request SignerMessage, chainID string) (SignerMessage, error) {
var res SignerMessage
var err error
switch r := request.(type) {
// This is broken and will answer most requests with a pubkey response
case *PubKeyRequest:
res = &PubKeyResponse{nil, nil}
case *SignVoteRequest:
res = &PubKeyResponse{nil, nil}
case *SignProposalRequest:
res = &PubKeyResponse{nil, nil}
case *PingRequest:
err, res = nil, &PingResponse{}
default:
err = fmt.Errorf("unknown msg: %v", r)
}
return res, err
}
func TestSignerUnexpectedResponse(t *testing.T) {
for _, tc := range getSignerTestCases(t) {
tc.signerServer.privVal = types.NewMockPV()
tc.mockPV = types.NewMockPV()
tc.signerServer.SetRequestHandler(brokenHandler)
defer tc.signerServer.Stop()
defer tc.signerClient.Close()
ts := time.Now()
want := &types.Vote{Timestamp: ts, Type: types.PrecommitType}
e := tc.signerClient.SignVote(tc.chainID, want)
assert.EqualError(t, e, "received unexpected response")
}
}

View File

@@ -0,0 +1,84 @@
package privval
import (
"time"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
)
const (
defaultMaxDialRetries = 10
defaultRetryWaitMilliseconds = 100
)
// SignerServiceEndpointOption sets an optional parameter on the SignerDialerEndpoint.
type SignerServiceEndpointOption func(*SignerDialerEndpoint)
// SignerDialerEndpointTimeoutReadWrite sets the read and write timeout for connections
// from external signing processes.
func SignerDialerEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.timeoutReadWrite = timeout }
}
// SignerDialerEndpointConnRetries sets the amount of attempted retries to acceptNewConnection.
func SignerDialerEndpointConnRetries(retries int) SignerServiceEndpointOption {
return func(ss *SignerDialerEndpoint) { ss.maxConnRetries = retries }
}
// SignerDialerEndpoint dials using its dialer and responds to any
// signature requests using its privVal.
type SignerDialerEndpoint struct {
signerEndpoint
dialer SocketDialer
retryWait time.Duration
maxConnRetries int
}
// NewSignerDialerEndpoint returns a SignerDialerEndpoint that will dial using the given
// dialer and respond to any signature requests over the connection
// using the given privVal.
func NewSignerDialerEndpoint(
logger log.Logger,
dialer SocketDialer,
) *SignerDialerEndpoint {
sd := &SignerDialerEndpoint{
dialer: dialer,
retryWait: defaultRetryWaitMilliseconds * time.Millisecond,
maxConnRetries: defaultMaxDialRetries,
}
sd.BaseService = *cmn.NewBaseService(logger, "SignerDialerEndpoint", sd)
sd.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
return sd
}
func (sd *SignerDialerEndpoint) ensureConnection() error {
if sd.IsConnected() {
return nil
}
retries := 0
for retries < sd.maxConnRetries {
conn, err := sd.dialer()
if err != nil {
retries++
sd.Logger.Debug("SignerDialer: Reconnection failed", "retries", retries, "max", sd.maxConnRetries, "err", err)
// Wait between retries
time.Sleep(sd.retryWait)
} else {
sd.SetConnection(conn)
sd.Logger.Debug("SignerDialer: Connection Ready")
return nil
}
}
sd.Logger.Debug("SignerDialer: Max retries exceeded", "retries", retries, "max", sd.maxConnRetries)
return ErrNoConnection
}

156
privval/signer_endpoint.go Normal file
View File

@@ -0,0 +1,156 @@
package privval
import (
"fmt"
"net"
"sync"
"time"
"github.com/pkg/errors"
cmn "github.com/tendermint/tendermint/libs/common"
)
const (
defaultTimeoutReadWriteSeconds = 3
)
type signerEndpoint struct {
cmn.BaseService
connMtx sync.Mutex
conn net.Conn
timeoutReadWrite time.Duration
}
// Close closes the underlying net.Conn.
func (se *signerEndpoint) Close() error {
se.DropConnection()
return nil
}
// IsConnected indicates if there is an active connection
func (se *signerEndpoint) IsConnected() bool {
se.connMtx.Lock()
defer se.connMtx.Unlock()
return se.isConnected()
}
// TryGetConnection retrieves a connection if it is already available
func (se *signerEndpoint) GetAvailableConnection(connectionAvailableCh chan net.Conn) bool {
se.connMtx.Lock()
defer se.connMtx.Unlock()
// Is there a connection ready?
select {
case se.conn = <-connectionAvailableCh:
return true
default:
}
return false
}
// TryGetConnection retrieves a connection if it is already available
func (se *signerEndpoint) WaitConnection(connectionAvailableCh chan net.Conn, maxWait time.Duration) error {
se.connMtx.Lock()
defer se.connMtx.Unlock()
select {
case se.conn = <-connectionAvailableCh:
case <-time.After(maxWait):
return ErrConnectionTimeout
}
return nil
}
// SetConnection replaces the current connection object
func (se *signerEndpoint) SetConnection(newConnection net.Conn) {
se.connMtx.Lock()
defer se.connMtx.Unlock()
se.conn = newConnection
}
// IsConnected indicates if there is an active connection
func (se *signerEndpoint) DropConnection() {
se.connMtx.Lock()
defer se.connMtx.Unlock()
se.dropConnection()
}
// ReadMessage reads a message from the endpoint
func (se *signerEndpoint) ReadMessage() (msg SignerMessage, err error) {
se.connMtx.Lock()
defer se.connMtx.Unlock()
if !se.isConnected() {
return nil, fmt.Errorf("endpoint is not connected")
}
// Reset read deadline
deadline := time.Now().Add(se.timeoutReadWrite)
err = se.conn.SetReadDeadline(deadline)
if err != nil {
return
}
const maxRemoteSignerMsgSize = 1024 * 10
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(se.conn, &msg, maxRemoteSignerMsgSize)
if _, ok := err.(timeoutError); ok {
if err != nil {
err = errors.Wrap(ErrReadTimeout, err.Error())
} else {
err = errors.Wrap(ErrReadTimeout, "Empty error")
}
se.Logger.Debug("Dropping [read]", "obj", se)
se.dropConnection()
}
return
}
// WriteMessage writes a message from the endpoint
func (se *signerEndpoint) WriteMessage(msg SignerMessage) (err error) {
se.connMtx.Lock()
defer se.connMtx.Unlock()
if !se.isConnected() {
return errors.Wrap(ErrNoConnection, "endpoint is not connected")
}
// Reset read deadline
deadline := time.Now().Add(se.timeoutReadWrite)
se.Logger.Debug("Write::Error Resetting deadline", "obj", se)
err = se.conn.SetWriteDeadline(deadline)
if err != nil {
return
}
_, err = cdc.MarshalBinaryLengthPrefixedWriter(se.conn, msg)
if _, ok := err.(timeoutError); ok {
if err != nil {
err = errors.Wrap(ErrWriteTimeout, err.Error())
} else {
err = errors.Wrap(ErrWriteTimeout, "Empty error")
}
se.dropConnection()
}
return
}
func (se *signerEndpoint) isConnected() bool {
return se.conn != nil
}
func (se *signerEndpoint) dropConnection() {
if se.conn != nil {
if err := se.conn.Close(); err != nil {
se.Logger.Error("signerEndpoint::dropConnection", "err", err)
}
se.conn = nil
}
}

View File

@@ -0,0 +1,198 @@
package privval
import (
"fmt"
"net"
"sync"
"time"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
)
// SignerValidatorEndpointOption sets an optional parameter on the SocketVal.
type SignerValidatorEndpointOption func(*SignerListenerEndpoint)
// SignerListenerEndpoint listens for an external process to dial in
// and keeps the connection alive by dropping and reconnecting
type SignerListenerEndpoint struct {
signerEndpoint
listener net.Listener
connectRequestCh chan struct{}
connectionAvailableCh chan net.Conn
timeoutAccept time.Duration
pingTimer *time.Ticker
instanceMtx sync.Mutex // Ensures instance public methods access, i.e. SendRequest
}
// NewSignerListenerEndpoint returns an instance of SignerListenerEndpoint.
func NewSignerListenerEndpoint(
logger log.Logger,
listener net.Listener,
) *SignerListenerEndpoint {
sc := &SignerListenerEndpoint{
listener: listener,
timeoutAccept: defaultTimeoutAcceptSeconds * time.Second,
}
sc.BaseService = *cmn.NewBaseService(logger, "SignerListenerEndpoint", sc)
sc.signerEndpoint.timeoutReadWrite = defaultTimeoutReadWriteSeconds * time.Second
return sc
}
// OnStart implements cmn.Service.
func (sl *SignerListenerEndpoint) OnStart() error {
sl.connectRequestCh = make(chan struct{})
sl.connectionAvailableCh = make(chan net.Conn)
sl.pingTimer = time.NewTicker(defaultPingPeriodMilliseconds * time.Millisecond)
go sl.serviceLoop()
go sl.pingLoop()
sl.connectRequestCh <- struct{}{}
return nil
}
// OnStop implements cmn.Service
func (sl *SignerListenerEndpoint) OnStop() {
sl.instanceMtx.Lock()
defer sl.instanceMtx.Unlock()
_ = sl.Close()
// Stop listening
if sl.listener != nil {
if err := sl.listener.Close(); err != nil {
sl.Logger.Error("Closing Listener", "err", err)
sl.listener = nil
}
}
sl.pingTimer.Stop()
}
// WaitForConnection waits maxWait for a connection or returns a timeout error
func (sl *SignerListenerEndpoint) WaitForConnection(maxWait time.Duration) error {
sl.instanceMtx.Lock()
defer sl.instanceMtx.Unlock()
return sl.ensureConnection(maxWait)
}
// SendRequest ensures there is a connection, sends a request and waits for a response
func (sl *SignerListenerEndpoint) SendRequest(request SignerMessage) (SignerMessage, error) {
sl.instanceMtx.Lock()
defer sl.instanceMtx.Unlock()
err := sl.ensureConnection(sl.timeoutAccept)
if err != nil {
return nil, err
}
err = sl.WriteMessage(request)
if err != nil {
return nil, err
}
res, err := sl.ReadMessage()
if err != nil {
return nil, err
}
return res, nil
}
func (sl *SignerListenerEndpoint) ensureConnection(maxWait time.Duration) error {
if sl.IsConnected() {
return nil
}
// Is there a connection ready? then use it
if sl.GetAvailableConnection(sl.connectionAvailableCh) {
return nil
}
// block until connected or timeout
sl.triggerConnect()
err := sl.WaitConnection(sl.connectionAvailableCh, maxWait)
if err != nil {
return err
}
return nil
}
func (sl *SignerListenerEndpoint) acceptNewConnection() (net.Conn, error) {
if !sl.IsRunning() || sl.listener == nil {
return nil, fmt.Errorf("endpoint is closing")
}
// wait for a new conn
sl.Logger.Info("SignerListener: Listening for new connection")
conn, err := sl.listener.Accept()
if err != nil {
return nil, err
}
return conn, nil
}
func (sl *SignerListenerEndpoint) triggerConnect() {
select {
case sl.connectRequestCh <- struct{}{}:
default:
}
}
func (sl *SignerListenerEndpoint) triggerReconnect() {
sl.DropConnection()
sl.triggerConnect()
}
func (sl *SignerListenerEndpoint) serviceLoop() {
for {
select {
case <-sl.connectRequestCh:
{
conn, err := sl.acceptNewConnection()
if err == nil {
sl.Logger.Info("SignerListener: Connected")
// We have a good connection, wait for someone that needs one otherwise cancellation
select {
case sl.connectionAvailableCh <- conn:
case <-sl.Quit():
return
}
}
select {
case sl.connectRequestCh <- struct{}{}:
default:
}
}
case <-sl.Quit():
return
}
}
}
func (sl *SignerListenerEndpoint) pingLoop() {
for {
select {
case <-sl.pingTimer.C:
{
_, err := sl.SendRequest(&PingRequest{})
if err != nil {
sl.Logger.Error("SignerListener: Ping timeout")
sl.triggerReconnect()
}
}
case <-sl.Quit():
return
}
}
}

View File

@@ -0,0 +1,198 @@
package privval
import (
"net"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
var (
testTimeoutAccept = defaultTimeoutAcceptSeconds * time.Second
testTimeoutReadWrite = 100 * time.Millisecond
testTimeoutReadWrite2o3 = 60 * time.Millisecond // 2/3 of the other one
)
type dialerTestCase struct {
addr string
dialer SocketDialer
}
// TestSignerRemoteRetryTCPOnly will test connection retry attempts over TCP. We
// don't need this for Unix sockets because the OS instantly knows the state of
// both ends of the socket connection. This basically causes the
// SignerDialerEndpoint.dialer() call inside SignerDialerEndpoint.acceptNewConnection() to return
// successfully immediately, putting an instant stop to any retry attempts.
func TestSignerRemoteRetryTCPOnly(t *testing.T) {
var (
attemptCh = make(chan int)
retries = 10
)
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
// Continuously Accept connection and close {attempts} times
go func(ln net.Listener, attemptCh chan<- int) {
attempts := 0
for {
conn, err := ln.Accept()
require.NoError(t, err)
err = conn.Close()
require.NoError(t, err)
attempts++
if attempts == retries {
attemptCh <- attempts
break
}
}
}(ln, attemptCh)
dialerEndpoint := NewSignerDialerEndpoint(
log.TestingLogger(),
DialTCPFn(ln.Addr().String(), testTimeoutReadWrite, ed25519.GenPrivKey()),
)
SignerDialerEndpointTimeoutReadWrite(time.Millisecond)(dialerEndpoint)
SignerDialerEndpointConnRetries(retries)(dialerEndpoint)
chainId := cmn.RandStr(12)
mockPV := types.NewMockPV()
signerServer := NewSignerServer(dialerEndpoint, chainId, mockPV)
err = signerServer.Start()
require.NoError(t, err)
defer signerServer.Stop()
select {
case attempts := <-attemptCh:
assert.Equal(t, retries, attempts)
case <-time.After(1500 * time.Millisecond):
t.Error("expected remote to observe connection attempts")
}
}
func TestRetryConnToRemoteSigner(t *testing.T) {
for _, tc := range getDialerTestCases(t) {
var (
logger = log.TestingLogger()
chainID = cmn.RandStr(12)
mockPV = types.NewMockPV()
endpointIsOpenCh = make(chan struct{})
thisConnTimeout = testTimeoutReadWrite
listenerEndpoint = newSignerListenerEndpoint(logger, tc.addr, thisConnTimeout)
)
dialerEndpoint := NewSignerDialerEndpoint(
logger,
tc.dialer,
)
SignerDialerEndpointTimeoutReadWrite(testTimeoutReadWrite)(dialerEndpoint)
SignerDialerEndpointConnRetries(10)(dialerEndpoint)
signerServer := NewSignerServer(dialerEndpoint, chainID, mockPV)
startListenerEndpointAsync(t, listenerEndpoint, endpointIsOpenCh)
defer listenerEndpoint.Stop()
require.NoError(t, signerServer.Start())
assert.True(t, signerServer.IsRunning())
<-endpointIsOpenCh
signerServer.Stop()
dialerEndpoint2 := NewSignerDialerEndpoint(
logger,
tc.dialer,
)
signerServer2 := NewSignerServer(dialerEndpoint2, chainID, mockPV)
// let some pings pass
require.NoError(t, signerServer2.Start())
assert.True(t, signerServer2.IsRunning())
defer signerServer2.Stop()
// give the client some time to re-establish the conn to the remote signer
// should see sth like this in the logs:
//
// E[10016-01-10|17:12:46.128] Ping err="remote signer timed out"
// I[10016-01-10|17:16:42.447] Re-created connection to remote signer impl=SocketVal
time.Sleep(testTimeoutReadWrite * 2)
}
}
///////////////////////////////////
func newSignerListenerEndpoint(logger log.Logger, addr string, timeoutReadWrite time.Duration) *SignerListenerEndpoint {
proto, address := cmn.ProtocolAndAddress(addr)
ln, err := net.Listen(proto, address)
logger.Info("SignerListener: Listening", "proto", proto, "address", address)
if err != nil {
panic(err)
}
var listener net.Listener
if proto == "unix" {
unixLn := NewUnixListener(ln)
UnixListenerTimeoutAccept(testTimeoutAccept)(unixLn)
UnixListenerTimeoutReadWrite(timeoutReadWrite)(unixLn)
listener = unixLn
} else {
tcpLn := NewTCPListener(ln, ed25519.GenPrivKey())
TCPListenerTimeoutAccept(testTimeoutAccept)(tcpLn)
TCPListenerTimeoutReadWrite(timeoutReadWrite)(tcpLn)
listener = tcpLn
}
return NewSignerListenerEndpoint(logger, listener)
}
func startListenerEndpointAsync(t *testing.T, sle *SignerListenerEndpoint, endpointIsOpenCh chan struct{}) {
go func(sle *SignerListenerEndpoint) {
require.NoError(t, sle.Start())
assert.True(t, sle.IsRunning())
close(endpointIsOpenCh)
}(sle)
}
func getMockEndpoints(
t *testing.T,
addr string,
socketDialer SocketDialer,
) (*SignerListenerEndpoint, *SignerDialerEndpoint) {
var (
logger = log.TestingLogger()
endpointIsOpenCh = make(chan struct{})
dialerEndpoint = NewSignerDialerEndpoint(
logger,
socketDialer,
)
listenerEndpoint = newSignerListenerEndpoint(logger, addr, testTimeoutReadWrite)
)
SignerDialerEndpointTimeoutReadWrite(testTimeoutReadWrite)(dialerEndpoint)
SignerDialerEndpointConnRetries(1e6)(dialerEndpoint)
startListenerEndpointAsync(t, listenerEndpoint, endpointIsOpenCh)
require.NoError(t, dialerEndpoint.Start())
assert.True(t, dialerEndpoint.IsRunning())
<-endpointIsOpenCh
return listenerEndpoint, dialerEndpoint
}

View File

@@ -1,192 +0,0 @@
package privval
import (
"fmt"
"io"
"net"
"github.com/pkg/errors"
"github.com/tendermint/tendermint/crypto"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/types"
)
// SignerRemote implements PrivValidator.
// It uses a net.Conn to request signatures from an external process.
type SignerRemote struct {
conn net.Conn
// memoized
consensusPubKey crypto.PubKey
}
// Check that SignerRemote implements PrivValidator.
var _ types.PrivValidator = (*SignerRemote)(nil)
// NewSignerRemote returns an instance of SignerRemote.
func NewSignerRemote(conn net.Conn) (*SignerRemote, error) {
// retrieve and memoize the consensus public key once.
pubKey, err := getPubKey(conn)
if err != nil {
return nil, cmn.ErrorWrap(err, "error while retrieving public key for remote signer")
}
return &SignerRemote{
conn: conn,
consensusPubKey: pubKey,
}, nil
}
// Close calls Close on the underlying net.Conn.
func (sc *SignerRemote) Close() error {
return sc.conn.Close()
}
// GetPubKey implements PrivValidator.
func (sc *SignerRemote) GetPubKey() crypto.PubKey {
return sc.consensusPubKey
}
// not thread-safe (only called on startup).
func getPubKey(conn net.Conn) (crypto.PubKey, error) {
err := writeMsg(conn, &PubKeyRequest{})
if err != nil {
return nil, err
}
res, err := readMsg(conn)
if err != nil {
return nil, err
}
pubKeyResp, ok := res.(*PubKeyResponse)
if !ok {
return nil, errors.Wrap(ErrUnexpectedResponse, "response is not PubKeyResponse")
}
if pubKeyResp.Error != nil {
return nil, errors.Wrap(pubKeyResp.Error, "failed to get private validator's public key")
}
return pubKeyResp.PubKey, nil
}
// SignVote implements PrivValidator.
func (sc *SignerRemote) SignVote(chainID string, vote *types.Vote) error {
err := writeMsg(sc.conn, &SignVoteRequest{Vote: vote})
if err != nil {
return err
}
res, err := readMsg(sc.conn)
if err != nil {
return err
}
resp, ok := res.(*SignedVoteResponse)
if !ok {
return ErrUnexpectedResponse
}
if resp.Error != nil {
return resp.Error
}
*vote = *resp.Vote
return nil
}
// SignProposal implements PrivValidator.
func (sc *SignerRemote) SignProposal(chainID string, proposal *types.Proposal) error {
err := writeMsg(sc.conn, &SignProposalRequest{Proposal: proposal})
if err != nil {
return err
}
res, err := readMsg(sc.conn)
if err != nil {
return err
}
resp, ok := res.(*SignedProposalResponse)
if !ok {
return ErrUnexpectedResponse
}
if resp.Error != nil {
return resp.Error
}
*proposal = *resp.Proposal
return nil
}
// Ping is used to check connection health.
func (sc *SignerRemote) Ping() error {
err := writeMsg(sc.conn, &PingRequest{})
if err != nil {
return err
}
res, err := readMsg(sc.conn)
if err != nil {
return err
}
_, ok := res.(*PingResponse)
if !ok {
return ErrUnexpectedResponse
}
return nil
}
func readMsg(r io.Reader) (msg RemoteSignerMsg, err error) {
const maxRemoteSignerMsgSize = 1024 * 10
_, err = cdc.UnmarshalBinaryLengthPrefixedReader(r, &msg, maxRemoteSignerMsgSize)
if _, ok := err.(timeoutError); ok {
err = cmn.ErrorWrap(ErrConnTimeout, err.Error())
}
return
}
func writeMsg(w io.Writer, msg interface{}) (err error) {
_, err = cdc.MarshalBinaryLengthPrefixedWriter(w, msg)
if _, ok := err.(timeoutError); ok {
err = cmn.ErrorWrap(ErrConnTimeout, err.Error())
}
return
}
func handleRequest(req RemoteSignerMsg, chainID string, privVal types.PrivValidator) (RemoteSignerMsg, error) {
var res RemoteSignerMsg
var err error
switch r := req.(type) {
case *PubKeyRequest:
var p crypto.PubKey
p = privVal.GetPubKey()
res = &PubKeyResponse{p, nil}
case *SignVoteRequest:
err = privVal.SignVote(chainID, r.Vote)
if err != nil {
res = &SignedVoteResponse{nil, &RemoteSignerError{0, err.Error()}}
} else {
res = &SignedVoteResponse{r.Vote, nil}
}
case *SignProposalRequest:
err = privVal.SignProposal(chainID, r.Proposal)
if err != nil {
res = &SignedProposalResponse{nil, &RemoteSignerError{0, err.Error()}}
} else {
res = &SignedProposalResponse{r.Proposal, nil}
}
case *PingRequest:
res = &PingResponse{}
default:
err = fmt.Errorf("unknown msg: %v", r)
}
return res, err
}

View File

@@ -1,68 +0,0 @@
package privval
import (
"net"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
// TestSignerRemoteRetryTCPOnly will test connection retry attempts over TCP. We
// don't need this for Unix sockets because the OS instantly knows the state of
// both ends of the socket connection. This basically causes the
// SignerServiceEndpoint.dialer() call inside SignerServiceEndpoint.connect() to return
// successfully immediately, putting an instant stop to any retry attempts.
func TestSignerRemoteRetryTCPOnly(t *testing.T) {
var (
attemptCh = make(chan int)
retries = 2
)
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
go func(ln net.Listener, attemptCh chan<- int) {
attempts := 0
for {
conn, err := ln.Accept()
require.NoError(t, err)
err = conn.Close()
require.NoError(t, err)
attempts++
if attempts == retries {
attemptCh <- attempts
break
}
}
}(ln, attemptCh)
serviceEndpoint := NewSignerServiceEndpoint(
log.TestingLogger(),
cmn.RandStr(12),
types.NewMockPV(),
DialTCPFn(ln.Addr().String(), testTimeoutReadWrite, ed25519.GenPrivKey()),
)
defer serviceEndpoint.Stop()
SignerServiceEndpointTimeoutReadWrite(time.Millisecond)(serviceEndpoint)
SignerServiceEndpointConnRetries(retries)(serviceEndpoint)
assert.Equal(t, serviceEndpoint.Start(), ErrDialRetryMax)
select {
case attempts := <-attemptCh:
assert.Equal(t, retries, attempts)
case <-time.After(100 * time.Millisecond):
t.Error("expected remote to observe connection attempts")
}
}

View File

@@ -0,0 +1,44 @@
package privval
import (
"fmt"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/types"
)
func DefaultValidationRequestHandler(privVal types.PrivValidator, req SignerMessage, chainID string) (SignerMessage, error) {
var res SignerMessage
var err error
switch r := req.(type) {
case *PubKeyRequest:
var p crypto.PubKey
p = privVal.GetPubKey()
res = &PubKeyResponse{p, nil}
case *SignVoteRequest:
err = privVal.SignVote(chainID, r.Vote)
if err != nil {
res = &SignedVoteResponse{nil, &RemoteSignerError{0, err.Error()}}
} else {
res = &SignedVoteResponse{r.Vote, nil}
}
case *SignProposalRequest:
err = privVal.SignProposal(chainID, r.Proposal)
if err != nil {
res = &SignedProposalResponse{nil, &RemoteSignerError{0, err.Error()}}
} else {
res = &SignedProposalResponse{r.Proposal, nil}
}
case *PingRequest:
err, res = nil, &PingResponse{}
default:
err = fmt.Errorf("unknown msg: %v", r)
}
return res, err
}

107
privval/signer_server.go Normal file
View File

@@ -0,0 +1,107 @@
package privval
import (
"io"
"sync"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/types"
)
// ValidationRequestHandlerFunc handles different remoteSigner requests
type ValidationRequestHandlerFunc func(
privVal types.PrivValidator,
requestMessage SignerMessage,
chainID string) (SignerMessage, error)
type SignerServer struct {
cmn.BaseService
endpoint *SignerDialerEndpoint
chainID string
privVal types.PrivValidator
handlerMtx sync.Mutex
validationRequestHandler ValidationRequestHandlerFunc
}
func NewSignerServer(endpoint *SignerDialerEndpoint, chainID string, privVal types.PrivValidator) *SignerServer {
ss := &SignerServer{
endpoint: endpoint,
chainID: chainID,
privVal: privVal,
validationRequestHandler: DefaultValidationRequestHandler,
}
ss.BaseService = *cmn.NewBaseService(endpoint.Logger, "SignerServer", ss)
return ss
}
// OnStart implements cmn.Service.
func (ss *SignerServer) OnStart() error {
go ss.serviceLoop()
return nil
}
// OnStop implements cmn.Service.
func (ss *SignerServer) OnStop() {
ss.endpoint.Logger.Debug("SignerServer: OnStop calling Close")
_ = ss.endpoint.Close()
}
// SetRequestHandler override the default function that is used to service requests
func (ss *SignerServer) SetRequestHandler(validationRequestHandler ValidationRequestHandlerFunc) {
ss.handlerMtx.Lock()
defer ss.handlerMtx.Unlock()
ss.validationRequestHandler = validationRequestHandler
}
func (ss *SignerServer) servicePendingRequest() {
if !ss.IsRunning() {
return // Ignore error from closing.
}
req, err := ss.endpoint.ReadMessage()
if err != nil {
if err != io.EOF {
ss.Logger.Error("SignerServer: HandleMessage", "err", err)
}
return
}
var res SignerMessage
{
// limit the scope of the lock
ss.handlerMtx.Lock()
defer ss.handlerMtx.Unlock()
res, err = ss.validationRequestHandler(ss.privVal, req, ss.chainID)
if err != nil {
// only log the error; we'll reply with an error in res
ss.Logger.Error("SignerServer: handleMessage", "err", err)
}
}
if res != nil {
err = ss.endpoint.WriteMessage(res)
if err != nil {
ss.Logger.Error("SignerServer: writeMessage", "err", err)
}
}
}
func (ss *SignerServer) serviceLoop() {
for {
select {
default:
err := ss.endpoint.ensureConnection()
if err != nil {
return
}
ss.servicePendingRequest()
case <-ss.Quit():
return
}
}
}

View File

@@ -1,139 +0,0 @@
package privval
import (
"io"
"net"
"time"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
// SignerServiceEndpointOption sets an optional parameter on the SignerServiceEndpoint.
type SignerServiceEndpointOption func(*SignerServiceEndpoint)
// SignerServiceEndpointTimeoutReadWrite sets the read and write timeout for connections
// from external signing processes.
func SignerServiceEndpointTimeoutReadWrite(timeout time.Duration) SignerServiceEndpointOption {
return func(ss *SignerServiceEndpoint) { ss.timeoutReadWrite = timeout }
}
// SignerServiceEndpointConnRetries sets the amount of attempted retries to connect.
func SignerServiceEndpointConnRetries(retries int) SignerServiceEndpointOption {
return func(ss *SignerServiceEndpoint) { ss.connRetries = retries }
}
// SignerServiceEndpoint dials using its dialer and responds to any
// signature requests using its privVal.
type SignerServiceEndpoint struct {
cmn.BaseService
chainID string
timeoutReadWrite time.Duration
connRetries int
privVal types.PrivValidator
dialer SocketDialer
conn net.Conn
}
// NewSignerServiceEndpoint returns a SignerServiceEndpoint that will dial using the given
// dialer and respond to any signature requests over the connection
// using the given privVal.
func NewSignerServiceEndpoint(
logger log.Logger,
chainID string,
privVal types.PrivValidator,
dialer SocketDialer,
) *SignerServiceEndpoint {
se := &SignerServiceEndpoint{
chainID: chainID,
timeoutReadWrite: time.Second * defaultTimeoutReadWriteSeconds,
connRetries: defaultMaxDialRetries,
privVal: privVal,
dialer: dialer,
}
se.BaseService = *cmn.NewBaseService(logger, "SignerServiceEndpoint", se)
return se
}
// OnStart implements cmn.Service.
func (se *SignerServiceEndpoint) OnStart() error {
conn, err := se.connect()
if err != nil {
se.Logger.Error("OnStart", "err", err)
return err
}
se.conn = conn
go se.handleConnection(conn)
return nil
}
// OnStop implements cmn.Service.
func (se *SignerServiceEndpoint) OnStop() {
if se.conn == nil {
return
}
if err := se.conn.Close(); err != nil {
se.Logger.Error("OnStop", "err", cmn.ErrorWrap(err, "closing listener failed"))
}
}
func (se *SignerServiceEndpoint) connect() (net.Conn, error) {
for retries := 0; retries < se.connRetries; retries++ {
// Don't sleep if it is the first retry.
if retries > 0 {
time.Sleep(se.timeoutReadWrite)
}
conn, err := se.dialer()
if err == nil {
return conn, nil
}
se.Logger.Error("dialing", "err", err)
}
return nil, ErrDialRetryMax
}
func (se *SignerServiceEndpoint) handleConnection(conn net.Conn) {
for {
if !se.IsRunning() {
return // Ignore error from listener closing.
}
// Reset the connection deadline
deadline := time.Now().Add(se.timeoutReadWrite)
err := conn.SetDeadline(deadline)
if err != nil {
return
}
req, err := readMsg(conn)
if err != nil {
if err != io.EOF {
se.Logger.Error("handleConnection readMsg", "err", err)
}
return
}
res, err := handleRequest(req, se.chainID, se.privVal)
if err != nil {
// only log the error; we'll reply with an error in res
se.Logger.Error("handleConnection handleRequest", "err", err)
}
err = writeMsg(conn, res)
if err != nil {
se.Logger.Error("handleConnection writeMsg", "err", err)
return
}
}
}

View File

@@ -1,230 +0,0 @@
package privval
import (
"fmt"
"net"
"sync"
"time"
"github.com/tendermint/tendermint/crypto"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
const (
defaultHeartbeatSeconds = 2
defaultMaxDialRetries = 10
)
var (
heartbeatPeriod = time.Second * defaultHeartbeatSeconds
)
// SignerValidatorEndpointOption sets an optional parameter on the SocketVal.
type SignerValidatorEndpointOption func(*SignerValidatorEndpoint)
// SignerValidatorEndpointSetHeartbeat sets the period on which to check the liveness of the
// connected Signer connections.
func SignerValidatorEndpointSetHeartbeat(period time.Duration) SignerValidatorEndpointOption {
return func(sc *SignerValidatorEndpoint) { sc.heartbeatPeriod = period }
}
// SocketVal implements PrivValidator.
// It listens for an external process to dial in and uses
// the socket to request signatures.
type SignerValidatorEndpoint struct {
cmn.BaseService
listener net.Listener
// ping
cancelPingCh chan struct{}
pingTicker *time.Ticker
heartbeatPeriod time.Duration
// signer is mutable since it can be reset if the connection fails.
// failures are detected by a background ping routine.
// All messages are request/response, so we hold the mutex
// so only one request/response pair can happen at a time.
// Methods on the underlying net.Conn itself are already goroutine safe.
mtx sync.Mutex
// TODO: Signer should encapsulate and hide the endpoint completely. Invert the relation
signer *SignerRemote
}
// Check that SignerValidatorEndpoint implements PrivValidator.
var _ types.PrivValidator = (*SignerValidatorEndpoint)(nil)
// NewSignerValidatorEndpoint returns an instance of SignerValidatorEndpoint.
func NewSignerValidatorEndpoint(logger log.Logger, listener net.Listener) *SignerValidatorEndpoint {
sc := &SignerValidatorEndpoint{
listener: listener,
heartbeatPeriod: heartbeatPeriod,
}
sc.BaseService = *cmn.NewBaseService(logger, "SignerValidatorEndpoint", sc)
return sc
}
//--------------------------------------------------------
// Implement PrivValidator
// GetPubKey implements PrivValidator.
func (ve *SignerValidatorEndpoint) GetPubKey() crypto.PubKey {
ve.mtx.Lock()
defer ve.mtx.Unlock()
return ve.signer.GetPubKey()
}
// SignVote implements PrivValidator.
func (ve *SignerValidatorEndpoint) SignVote(chainID string, vote *types.Vote) error {
ve.mtx.Lock()
defer ve.mtx.Unlock()
return ve.signer.SignVote(chainID, vote)
}
// SignProposal implements PrivValidator.
func (ve *SignerValidatorEndpoint) SignProposal(chainID string, proposal *types.Proposal) error {
ve.mtx.Lock()
defer ve.mtx.Unlock()
return ve.signer.SignProposal(chainID, proposal)
}
//--------------------------------------------------------
// More thread safe methods proxied to the signer
// Ping is used to check connection health.
func (ve *SignerValidatorEndpoint) Ping() error {
ve.mtx.Lock()
defer ve.mtx.Unlock()
return ve.signer.Ping()
}
// Close closes the underlying net.Conn.
func (ve *SignerValidatorEndpoint) Close() {
ve.mtx.Lock()
defer ve.mtx.Unlock()
if ve.signer != nil {
if err := ve.signer.Close(); err != nil {
ve.Logger.Error("OnStop", "err", err)
}
}
if ve.listener != nil {
if err := ve.listener.Close(); err != nil {
ve.Logger.Error("OnStop", "err", err)
}
}
}
//--------------------------------------------------------
// Service start and stop
// OnStart implements cmn.Service.
func (ve *SignerValidatorEndpoint) OnStart() error {
if closed, err := ve.reset(); err != nil {
ve.Logger.Error("OnStart", "err", err)
return err
} else if closed {
return fmt.Errorf("listener is closed")
}
// Start a routine to keep the connection alive
ve.cancelPingCh = make(chan struct{}, 1)
ve.pingTicker = time.NewTicker(ve.heartbeatPeriod)
go func() {
for {
select {
case <-ve.pingTicker.C:
err := ve.Ping()
if err != nil {
ve.Logger.Error("Ping", "err", err)
if err == ErrUnexpectedResponse {
return
}
closed, err := ve.reset()
if err != nil {
ve.Logger.Error("Reconnecting to remote signer failed", "err", err)
continue
}
if closed {
ve.Logger.Info("listener is closing")
return
}
ve.Logger.Info("Re-created connection to remote signer", "impl", ve)
}
case <-ve.cancelPingCh:
ve.pingTicker.Stop()
return
}
}
}()
return nil
}
// OnStop implements cmn.Service.
func (ve *SignerValidatorEndpoint) OnStop() {
if ve.cancelPingCh != nil {
close(ve.cancelPingCh)
}
ve.Close()
}
//--------------------------------------------------------
// Connection and signer management
// waits to accept and sets a new connection.
// connection is closed in OnStop.
// returns true if the listener is closed
// (ie. it returns a nil conn).
func (ve *SignerValidatorEndpoint) reset() (closed bool, err error) {
ve.mtx.Lock()
defer ve.mtx.Unlock()
// first check if the conn already exists and close it.
if ve.signer != nil {
if tmpErr := ve.signer.Close(); tmpErr != nil {
ve.Logger.Error("error closing socket val connection during reset", "err", tmpErr)
}
}
// wait for a new conn
conn, err := ve.acceptConnection()
if err != nil {
return false, err
}
// listener is closed
if conn == nil {
return true, nil
}
ve.signer, err = NewSignerRemote(conn)
if err != nil {
// failed to fetch the pubkey. close out the connection.
if tmpErr := conn.Close(); tmpErr != nil {
ve.Logger.Error("error closing connection", "err", tmpErr)
}
return false, err
}
return false, nil
}
// Attempt to accept a connection.
// Times out after the listener's timeoutAccept
func (ve *SignerValidatorEndpoint) acceptConnection() (net.Conn, error) {
conn, err := ve.listener.Accept()
if err != nil {
if !ve.IsRunning() {
return nil, nil // Ignore error from listener closing.
}
return nil, err
}
return conn, nil
}

View File

@@ -1,506 +0,0 @@
package privval
import (
"fmt"
"net"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/types"
)
var (
testTimeoutAccept = defaultTimeoutAcceptSeconds * time.Second
testTimeoutReadWrite = 100 * time.Millisecond
testTimeoutReadWrite2o3 = 66 * time.Millisecond // 2/3 of the other one
testTimeoutHeartbeat = 10 * time.Millisecond
testTimeoutHeartbeat3o2 = 6 * time.Millisecond // 3/2 of the other one
)
type socketTestCase struct {
addr string
dialer SocketDialer
}
func socketTestCases(t *testing.T) []socketTestCase {
tcpAddr := fmt.Sprintf("tcp://%s", testFreeTCPAddr(t))
unixFilePath, err := testUnixAddr()
require.NoError(t, err)
unixAddr := fmt.Sprintf("unix://%s", unixFilePath)
return []socketTestCase{
{
addr: tcpAddr,
dialer: DialTCPFn(tcpAddr, testTimeoutReadWrite, ed25519.GenPrivKey()),
},
{
addr: unixAddr,
dialer: DialUnixFn(unixFilePath),
},
}
}
func TestSocketPVAddress(t *testing.T) {
for _, tc := range socketTestCases(t) {
// Execute the test within a closure to ensure the deferred statements
// are called between each for loop iteration, for isolated test cases.
func() {
var (
chainID = cmn.RandStr(12)
validatorEndpoint, serviceEndpoint = testSetupSocketPair(t, chainID, types.NewMockPV(), tc.addr, tc.dialer)
)
defer validatorEndpoint.Stop()
defer serviceEndpoint.Stop()
serviceAddr := serviceEndpoint.privVal.GetPubKey().Address()
validatorAddr := validatorEndpoint.GetPubKey().Address()
assert.Equal(t, serviceAddr, validatorAddr)
}()
}
}
func TestSocketPVPubKey(t *testing.T) {
for _, tc := range socketTestCases(t) {
func() {
var (
chainID = cmn.RandStr(12)
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
t,
chainID,
types.NewMockPV(),
tc.addr,
tc.dialer)
)
defer validatorEndpoint.Stop()
defer serviceEndpoint.Stop()
clientKey := validatorEndpoint.GetPubKey()
privvalPubKey := serviceEndpoint.privVal.GetPubKey()
assert.Equal(t, privvalPubKey, clientKey)
}()
}
}
func TestSocketPVProposal(t *testing.T) {
for _, tc := range socketTestCases(t) {
func() {
var (
chainID = cmn.RandStr(12)
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
t,
chainID,
types.NewMockPV(),
tc.addr,
tc.dialer)
ts = time.Now()
privProposal = &types.Proposal{Timestamp: ts}
clientProposal = &types.Proposal{Timestamp: ts}
)
defer validatorEndpoint.Stop()
defer serviceEndpoint.Stop()
require.NoError(t, serviceEndpoint.privVal.SignProposal(chainID, privProposal))
require.NoError(t, validatorEndpoint.SignProposal(chainID, clientProposal))
assert.Equal(t, privProposal.Signature, clientProposal.Signature)
}()
}
}
func TestSocketPVVote(t *testing.T) {
for _, tc := range socketTestCases(t) {
func() {
var (
chainID = cmn.RandStr(12)
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
t,
chainID,
types.NewMockPV(),
tc.addr,
tc.dialer)
ts = time.Now()
vType = types.PrecommitType
want = &types.Vote{Timestamp: ts, Type: vType}
have = &types.Vote{Timestamp: ts, Type: vType}
)
defer validatorEndpoint.Stop()
defer serviceEndpoint.Stop()
require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want))
require.NoError(t, validatorEndpoint.SignVote(chainID, have))
assert.Equal(t, want.Signature, have.Signature)
}()
}
}
func TestSocketPVVoteResetDeadline(t *testing.T) {
for _, tc := range socketTestCases(t) {
func() {
var (
chainID = cmn.RandStr(12)
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
t,
chainID,
types.NewMockPV(),
tc.addr,
tc.dialer)
ts = time.Now()
vType = types.PrecommitType
want = &types.Vote{Timestamp: ts, Type: vType}
have = &types.Vote{Timestamp: ts, Type: vType}
)
defer validatorEndpoint.Stop()
defer serviceEndpoint.Stop()
time.Sleep(testTimeoutReadWrite2o3)
require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want))
require.NoError(t, validatorEndpoint.SignVote(chainID, have))
assert.Equal(t, want.Signature, have.Signature)
// This would exceed the deadline if it was not extended by the previous message
time.Sleep(testTimeoutReadWrite2o3)
require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want))
require.NoError(t, validatorEndpoint.SignVote(chainID, have))
assert.Equal(t, want.Signature, have.Signature)
}()
}
}
func TestSocketPVVoteKeepalive(t *testing.T) {
for _, tc := range socketTestCases(t) {
func() {
var (
chainID = cmn.RandStr(12)
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
t,
chainID,
types.NewMockPV(),
tc.addr,
tc.dialer)
ts = time.Now()
vType = types.PrecommitType
want = &types.Vote{Timestamp: ts, Type: vType}
have = &types.Vote{Timestamp: ts, Type: vType}
)
defer validatorEndpoint.Stop()
defer serviceEndpoint.Stop()
time.Sleep(testTimeoutReadWrite * 2)
require.NoError(t, serviceEndpoint.privVal.SignVote(chainID, want))
require.NoError(t, validatorEndpoint.SignVote(chainID, have))
assert.Equal(t, want.Signature, have.Signature)
}()
}
}
func TestSocketPVDeadline(t *testing.T) {
for _, tc := range socketTestCases(t) {
func() {
var (
listenc = make(chan struct{})
thisConnTimeout = 100 * time.Millisecond
validatorEndpoint = newSignerValidatorEndpoint(log.TestingLogger(), tc.addr, thisConnTimeout)
)
go func(sc *SignerValidatorEndpoint) {
defer close(listenc)
// Note: the TCP connection times out at the accept() phase,
// whereas the Unix domain sockets connection times out while
// attempting to fetch the remote signer's public key.
assert.True(t, IsConnTimeout(sc.Start()))
assert.False(t, sc.IsRunning())
}(validatorEndpoint)
for {
_, err := cmn.Connect(tc.addr)
if err == nil {
break
}
}
<-listenc
}()
}
}
func TestRemoteSignVoteErrors(t *testing.T) {
for _, tc := range socketTestCases(t) {
func() {
var (
chainID = cmn.RandStr(12)
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
t,
chainID,
types.NewErroringMockPV(),
tc.addr,
tc.dialer)
ts = time.Now()
vType = types.PrecommitType
vote = &types.Vote{Timestamp: ts, Type: vType}
)
defer validatorEndpoint.Stop()
defer serviceEndpoint.Stop()
err := validatorEndpoint.SignVote("", vote)
require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
err = serviceEndpoint.privVal.SignVote(chainID, vote)
require.Error(t, err)
err = validatorEndpoint.SignVote(chainID, vote)
require.Error(t, err)
}()
}
}
func TestRemoteSignProposalErrors(t *testing.T) {
for _, tc := range socketTestCases(t) {
func() {
var (
chainID = cmn.RandStr(12)
validatorEndpoint, serviceEndpoint = testSetupSocketPair(
t,
chainID,
types.NewErroringMockPV(),
tc.addr,
tc.dialer)
ts = time.Now()
proposal = &types.Proposal{Timestamp: ts}
)
defer validatorEndpoint.Stop()
defer serviceEndpoint.Stop()
err := validatorEndpoint.SignProposal("", proposal)
require.Equal(t, err.(*RemoteSignerError).Description, types.ErroringMockPVErr.Error())
err = serviceEndpoint.privVal.SignProposal(chainID, proposal)
require.Error(t, err)
err = validatorEndpoint.SignProposal(chainID, proposal)
require.Error(t, err)
}()
}
}
func TestErrUnexpectedResponse(t *testing.T) {
for _, tc := range socketTestCases(t) {
func() {
var (
logger = log.TestingLogger()
chainID = cmn.RandStr(12)
readyCh = make(chan struct{})
errCh = make(chan error, 1)
serviceEndpoint = NewSignerServiceEndpoint(
logger,
chainID,
types.NewMockPV(),
tc.dialer,
)
validatorEndpoint = newSignerValidatorEndpoint(
logger,
tc.addr,
testTimeoutReadWrite)
)
testStartEndpoint(t, readyCh, validatorEndpoint)
defer validatorEndpoint.Stop()
SignerServiceEndpointTimeoutReadWrite(time.Millisecond)(serviceEndpoint)
SignerServiceEndpointConnRetries(100)(serviceEndpoint)
// we do not want to Start() the remote signer here and instead use the connection to
// reply with intentionally wrong replies below:
rsConn, err := serviceEndpoint.connect()
require.NoError(t, err)
require.NotNil(t, rsConn)
defer rsConn.Close()
// send over public key to get the remote signer running:
go testReadWriteResponse(t, &PubKeyResponse{}, rsConn)
<-readyCh
// Proposal:
go func(errc chan error) {
errc <- validatorEndpoint.SignProposal(chainID, &types.Proposal{})
}(errCh)
// read request and write wrong response:
go testReadWriteResponse(t, &SignedVoteResponse{}, rsConn)
err = <-errCh
require.Error(t, err)
require.Equal(t, err, ErrUnexpectedResponse)
// Vote:
go func(errc chan error) {
errc <- validatorEndpoint.SignVote(chainID, &types.Vote{})
}(errCh)
// read request and write wrong response:
go testReadWriteResponse(t, &SignedProposalResponse{}, rsConn)
err = <-errCh
require.Error(t, err)
require.Equal(t, err, ErrUnexpectedResponse)
}()
}
}
func TestRetryConnToRemoteSigner(t *testing.T) {
for _, tc := range socketTestCases(t) {
func() {
var (
logger = log.TestingLogger()
chainID = cmn.RandStr(12)
readyCh = make(chan struct{})
serviceEndpoint = NewSignerServiceEndpoint(
logger,
chainID,
types.NewMockPV(),
tc.dialer,
)
thisConnTimeout = testTimeoutReadWrite
validatorEndpoint = newSignerValidatorEndpoint(logger, tc.addr, thisConnTimeout)
)
// Ping every:
SignerValidatorEndpointSetHeartbeat(testTimeoutHeartbeat)(validatorEndpoint)
SignerServiceEndpointTimeoutReadWrite(testTimeoutReadWrite)(serviceEndpoint)
SignerServiceEndpointConnRetries(10)(serviceEndpoint)
testStartEndpoint(t, readyCh, validatorEndpoint)
defer validatorEndpoint.Stop()
require.NoError(t, serviceEndpoint.Start())
assert.True(t, serviceEndpoint.IsRunning())
<-readyCh
time.Sleep(testTimeoutHeartbeat * 2)
serviceEndpoint.Stop()
rs2 := NewSignerServiceEndpoint(
logger,
chainID,
types.NewMockPV(),
tc.dialer,
)
// let some pings pass
time.Sleep(testTimeoutHeartbeat3o2)
require.NoError(t, rs2.Start())
assert.True(t, rs2.IsRunning())
defer rs2.Stop()
// give the client some time to re-establish the conn to the remote signer
// should see sth like this in the logs:
//
// E[10016-01-10|17:12:46.128] Ping err="remote signer timed out"
// I[10016-01-10|17:16:42.447] Re-created connection to remote signer impl=SocketVal
time.Sleep(testTimeoutReadWrite * 2)
}()
}
}
func newSignerValidatorEndpoint(logger log.Logger, addr string, timeoutReadWrite time.Duration) *SignerValidatorEndpoint {
proto, address := cmn.ProtocolAndAddress(addr)
ln, err := net.Listen(proto, address)
logger.Info("Listening at", "proto", proto, "address", address)
if err != nil {
panic(err)
}
var listener net.Listener
if proto == "unix" {
unixLn := NewUnixListener(ln)
UnixListenerTimeoutAccept(testTimeoutAccept)(unixLn)
UnixListenerTimeoutReadWrite(timeoutReadWrite)(unixLn)
listener = unixLn
} else {
tcpLn := NewTCPListener(ln, ed25519.GenPrivKey())
TCPListenerTimeoutAccept(testTimeoutAccept)(tcpLn)
TCPListenerTimeoutReadWrite(timeoutReadWrite)(tcpLn)
listener = tcpLn
}
return NewSignerValidatorEndpoint(logger, listener)
}
func testSetupSocketPair(
t *testing.T,
chainID string,
privValidator types.PrivValidator,
addr string,
socketDialer SocketDialer,
) (*SignerValidatorEndpoint, *SignerServiceEndpoint) {
var (
logger = log.TestingLogger()
privVal = privValidator
readyc = make(chan struct{})
serviceEndpoint = NewSignerServiceEndpoint(
logger,
chainID,
privVal,
socketDialer,
)
thisConnTimeout = testTimeoutReadWrite
validatorEndpoint = newSignerValidatorEndpoint(logger, addr, thisConnTimeout)
)
SignerValidatorEndpointSetHeartbeat(testTimeoutHeartbeat)(validatorEndpoint)
SignerServiceEndpointTimeoutReadWrite(testTimeoutReadWrite)(serviceEndpoint)
SignerServiceEndpointConnRetries(1e6)(serviceEndpoint)
testStartEndpoint(t, readyc, validatorEndpoint)
require.NoError(t, serviceEndpoint.Start())
assert.True(t, serviceEndpoint.IsRunning())
<-readyc
return validatorEndpoint, serviceEndpoint
}
func testReadWriteResponse(t *testing.T, resp RemoteSignerMsg, rsConn net.Conn) {
_, err := readMsg(rsConn)
require.NoError(t, err)
err = writeMsg(rsConn, resp)
require.NoError(t, err)
}
func testStartEndpoint(t *testing.T, readyCh chan struct{}, sc *SignerValidatorEndpoint) {
go func(sc *SignerValidatorEndpoint) {
require.NoError(t, sc.Start())
assert.True(t, sc.IsRunning())
readyCh <- struct{}{}
}(sc)
}
// testFreeTCPAddr claims a free port so we don't block on listener being ready.
func testFreeTCPAddr(t *testing.T) string {
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer ln.Close()
return fmt.Sprintf("127.0.0.1:%d", ln.Addr().(*net.TCPAddr).Port)
}

View File

@@ -1,26 +1,49 @@
package privval
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
)
func getDialerTestCases(t *testing.T) []dialerTestCase {
tcpAddr := GetFreeLocalhostAddrPort()
unixFilePath, err := testUnixAddr()
require.NoError(t, err)
unixAddr := fmt.Sprintf("unix://%s", unixFilePath)
return []dialerTestCase{
{
addr: tcpAddr,
dialer: DialTCPFn(tcpAddr, testTimeoutReadWrite, ed25519.GenPrivKey()),
},
{
addr: unixAddr,
dialer: DialUnixFn(unixFilePath),
},
}
}
func TestIsConnTimeoutForFundamentalTimeouts(t *testing.T) {
// Generate a networking timeout
dialer := DialTCPFn(testFreeTCPAddr(t), time.Millisecond, ed25519.GenPrivKey())
tcpAddr := GetFreeLocalhostAddrPort()
dialer := DialTCPFn(tcpAddr, time.Millisecond, ed25519.GenPrivKey())
_, err := dialer()
assert.Error(t, err)
assert.True(t, IsConnTimeout(err))
}
func TestIsConnTimeoutForWrappedConnTimeouts(t *testing.T) {
dialer := DialTCPFn(testFreeTCPAddr(t), time.Millisecond, ed25519.GenPrivKey())
tcpAddr := GetFreeLocalhostAddrPort()
dialer := DialTCPFn(tcpAddr, time.Millisecond, ed25519.GenPrivKey())
_, err := dialer()
assert.Error(t, err)
err = cmn.ErrorWrap(ErrConnTimeout, err.Error())
err = cmn.ErrorWrap(ErrConnectionTimeout, err.Error())
assert.True(t, IsConnTimeout(err))
}

View File

@@ -9,8 +9,8 @@ import (
)
const (
defaultTimeoutAcceptSeconds = 3
defaultTimeoutReadWriteSeconds = 3
defaultTimeoutAcceptSeconds = 3
defaultPingPeriodMilliseconds = 100
)
// timeoutError can be used to check if an error returned from the netp package

View File

@@ -1,7 +1,12 @@
package privval
import (
"fmt"
"net"
"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
)
// IsConnTimeout returns a boolean indicating whether the error is known to
@@ -9,7 +14,7 @@ import (
// network timeouts, as well as ErrConnTimeout errors.
func IsConnTimeout(err error) bool {
if cmnErr, ok := err.(cmn.Error); ok {
if cmnErr.Data() == ErrConnTimeout {
if cmnErr.Data() == ErrConnectionTimeout {
return true
}
}
@@ -18,3 +23,39 @@ func IsConnTimeout(err error) bool {
}
return false
}
// NewSignerListener creates a new SignerListenerEndpoint using the corresponding listen address
func NewSignerListener(listenAddr string, logger log.Logger) (*SignerListenerEndpoint, error) {
var listener net.Listener
protocol, address := cmn.ProtocolAndAddress(listenAddr)
ln, err := net.Listen(protocol, address)
if err != nil {
return nil, err
}
switch protocol {
case "unix":
listener = NewUnixListener(ln)
case "tcp":
// TODO: persist this key so external signer can actually authenticate us
listener = NewTCPListener(ln, ed25519.GenPrivKey())
default:
return nil, fmt.Errorf(
"wrong listen address: expected either 'tcp' or 'unix' protocols, got %s",
protocol,
)
}
pve := NewSignerListenerEndpoint(logger.With("module", "privval"), listener)
return pve, nil
}
// GetFreeLocalhostAddrPort returns a free localhost:port address
func GetFreeLocalhostAddrPort() string {
port, err := cmn.GetFreePort()
if err != nil {
panic(err)
}
return fmt.Sprintf("127.0.0.1:%d", port)
}

View File

@@ -5,6 +5,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
cmn "github.com/tendermint/tendermint/libs/common"
)

View File

@@ -160,7 +160,7 @@ func validatePage(page, perPage, totalCount int) (int, error) {
pages = 1 // one page (even if it's empty)
}
if page < 0 || page > pages {
return 1, fmt.Errorf("page should be within [0, %d] range, given %d", pages, page)
return 1, fmt.Errorf("page should be within [1, %d] range, given %d", pages, page)
}
return page, nil

View File

@@ -49,7 +49,7 @@ var _ error = (*TestHarnessError)(nil)
// with this version of Tendermint.
type TestHarness struct {
addr string
spv *privval.SignerValidatorEndpoint
signerClient *privval.SignerClient
fpv *privval.FilePV
chainID string
acceptRetries int
@@ -101,14 +101,19 @@ func NewTestHarness(logger log.Logger, cfg TestHarnessConfig) (*TestHarness, err
}
logger.Info("Loaded genesis file", "chainID", st.ChainID)
spv, err := newTestHarnessSocketVal(logger, cfg)
spv, err := newTestHarnessListener(logger, cfg)
if err != nil {
return nil, newTestHarnessError(ErrFailedToCreateListener, err, "")
}
signerClient, err := privval.NewSignerClient(spv)
if err != nil {
return nil, newTestHarnessError(ErrFailedToCreateListener, err, "")
}
return &TestHarness{
addr: cfg.BindAddr,
spv: spv,
signerClient: signerClient,
fpv: fpv,
chainID: st.ChainID,
acceptRetries: cfg.AcceptRetries,
@@ -135,9 +140,11 @@ func (th *TestHarness) Run() {
th.logger.Info("Starting test harness")
accepted := false
var startErr error
for acceptRetries := th.acceptRetries; acceptRetries > 0; acceptRetries-- {
th.logger.Info("Attempting to accept incoming connection", "acceptRetries", acceptRetries)
if err := th.spv.Start(); err != nil {
if err := th.signerClient.WaitForConnection(10 * time.Millisecond); err != nil {
// if it wasn't a timeout error
if _, ok := err.(timeoutError); !ok {
th.logger.Error("Failed to start listener", "err", err)
@@ -149,6 +156,7 @@ func (th *TestHarness) Run() {
}
startErr = err
} else {
th.logger.Info("Accepted external connection")
accepted = true
break
}
@@ -182,8 +190,8 @@ func (th *TestHarness) Run() {
func (th *TestHarness) TestPublicKey() error {
th.logger.Info("TEST: Public key of remote signer")
th.logger.Info("Local", "pubKey", th.fpv.GetPubKey())
th.logger.Info("Remote", "pubKey", th.spv.GetPubKey())
if th.fpv.GetPubKey() != th.spv.GetPubKey() {
th.logger.Info("Remote", "pubKey", th.signerClient.GetPubKey())
if th.fpv.GetPubKey() != th.signerClient.GetPubKey() {
th.logger.Error("FAILED: Local and remote public keys do not match")
return newTestHarnessError(ErrTestPublicKeyFailed, nil, "")
}
@@ -211,7 +219,7 @@ func (th *TestHarness) TestSignProposal() error {
Timestamp: time.Now(),
}
propBytes := prop.SignBytes(th.chainID)
if err := th.spv.SignProposal(th.chainID, prop); err != nil {
if err := th.signerClient.SignProposal(th.chainID, prop); err != nil {
th.logger.Error("FAILED: Signing of proposal", "err", err)
return newTestHarnessError(ErrTestSignProposalFailed, err, "")
}
@@ -222,7 +230,7 @@ func (th *TestHarness) TestSignProposal() error {
return newTestHarnessError(ErrTestSignProposalFailed, err, "")
}
// now validate the signature on the proposal
if th.spv.GetPubKey().VerifyBytes(propBytes, prop.Signature) {
if th.signerClient.GetPubKey().VerifyBytes(propBytes, prop.Signature) {
th.logger.Info("Successfully validated proposal signature")
} else {
th.logger.Error("FAILED: Proposal signature validation failed")
@@ -255,7 +263,7 @@ func (th *TestHarness) TestSignVote() error {
}
voteBytes := vote.SignBytes(th.chainID)
// sign the vote
if err := th.spv.SignVote(th.chainID, vote); err != nil {
if err := th.signerClient.SignVote(th.chainID, vote); err != nil {
th.logger.Error("FAILED: Signing of vote", "err", err)
return newTestHarnessError(ErrTestSignVoteFailed, err, fmt.Sprintf("voteType=%d", voteType))
}
@@ -266,7 +274,7 @@ func (th *TestHarness) TestSignVote() error {
return newTestHarnessError(ErrTestSignVoteFailed, err, fmt.Sprintf("voteType=%d", voteType))
}
// now validate the signature on the proposal
if th.spv.GetPubKey().VerifyBytes(voteBytes, vote.Signature) {
if th.signerClient.GetPubKey().VerifyBytes(voteBytes, vote.Signature) {
th.logger.Info("Successfully validated vote signature", "type", voteType)
} else {
th.logger.Error("FAILED: Vote signature validation failed", "type", voteType)
@@ -301,10 +309,9 @@ func (th *TestHarness) Shutdown(err error) {
}()
}
if th.spv.IsRunning() {
if err := th.spv.Stop(); err != nil {
th.logger.Error("Failed to cleanly stop listener: %s", err.Error())
}
err = th.signerClient.Close()
if err != nil {
th.logger.Error("Failed to cleanly stop listener: %s", err.Error())
}
if th.exitWhenComplete {
@@ -312,9 +319,8 @@ func (th *TestHarness) Shutdown(err error) {
}
}
// newTestHarnessSocketVal creates our client instance which we will use for
// testing.
func newTestHarnessSocketVal(logger log.Logger, cfg TestHarnessConfig) (*privval.SignerValidatorEndpoint, error) {
// newTestHarnessListener creates our client instance which we will use for testing.
func newTestHarnessListener(logger log.Logger, cfg TestHarnessConfig) (*privval.SignerListenerEndpoint, error) {
proto, addr := cmn.ProtocolAndAddress(cfg.BindAddr)
if proto == "unix" {
// make sure the socket doesn't exist - if so, try to delete it
@@ -329,7 +335,7 @@ func newTestHarnessSocketVal(logger log.Logger, cfg TestHarnessConfig) (*privval
if err != nil {
return nil, err
}
logger.Info("Listening at", "proto", proto, "addr", addr)
logger.Info("Listening", "proto", proto, "addr", addr)
var svln net.Listener
switch proto {
case "unix":
@@ -347,7 +353,7 @@ func newTestHarnessSocketVal(logger log.Logger, cfg TestHarnessConfig) (*privval
logger.Error("Unsupported protocol (must be unix:// or tcp://)", "proto", proto)
return nil, newTestHarnessError(ErrInvalidParameters, nil, fmt.Sprintf("Unsupported protocol: %s", proto))
}
return privval.NewSignerValidatorEndpoint(logger, svln), nil
return privval.NewSignerListenerEndpoint(logger, svln), nil
}
func newTestHarnessError(code int, err error, info string) *TestHarnessError {

View File

@@ -3,19 +3,18 @@ package internal
import (
"fmt"
"io/ioutil"
"net"
"os"
"testing"
"time"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/types"
)
const (
@@ -85,8 +84,8 @@ func TestRemoteSignerTestHarnessMaxAcceptRetriesReached(t *testing.T) {
func TestRemoteSignerTestHarnessSuccessfulRun(t *testing.T) {
harnessTest(
t,
func(th *TestHarness) *privval.SignerServiceEndpoint {
return newMockRemoteSigner(t, th, th.fpv.Key.PrivKey, false, false)
func(th *TestHarness) *privval.SignerServer {
return newMockSignerServer(t, th, th.fpv.Key.PrivKey, false, false)
},
NoError,
)
@@ -95,8 +94,8 @@ func TestRemoteSignerTestHarnessSuccessfulRun(t *testing.T) {
func TestRemoteSignerPublicKeyCheckFailed(t *testing.T) {
harnessTest(
t,
func(th *TestHarness) *privval.SignerServiceEndpoint {
return newMockRemoteSigner(t, th, ed25519.GenPrivKey(), false, false)
func(th *TestHarness) *privval.SignerServer {
return newMockSignerServer(t, th, ed25519.GenPrivKey(), false, false)
},
ErrTestPublicKeyFailed,
)
@@ -105,8 +104,8 @@ func TestRemoteSignerPublicKeyCheckFailed(t *testing.T) {
func TestRemoteSignerProposalSigningFailed(t *testing.T) {
harnessTest(
t,
func(th *TestHarness) *privval.SignerServiceEndpoint {
return newMockRemoteSigner(t, th, th.fpv.Key.PrivKey, true, false)
func(th *TestHarness) *privval.SignerServer {
return newMockSignerServer(t, th, th.fpv.Key.PrivKey, true, false)
},
ErrTestSignProposalFailed,
)
@@ -115,28 +114,30 @@ func TestRemoteSignerProposalSigningFailed(t *testing.T) {
func TestRemoteSignerVoteSigningFailed(t *testing.T) {
harnessTest(
t,
func(th *TestHarness) *privval.SignerServiceEndpoint {
return newMockRemoteSigner(t, th, th.fpv.Key.PrivKey, false, true)
func(th *TestHarness) *privval.SignerServer {
return newMockSignerServer(t, th, th.fpv.Key.PrivKey, false, true)
},
ErrTestSignVoteFailed,
)
}
func newMockRemoteSigner(t *testing.T, th *TestHarness, privKey crypto.PrivKey, breakProposalSigning bool, breakVoteSigning bool) *privval.SignerServiceEndpoint {
return privval.NewSignerServiceEndpoint(
func newMockSignerServer(t *testing.T, th *TestHarness, privKey crypto.PrivKey, breakProposalSigning bool, breakVoteSigning bool) *privval.SignerServer {
mockPV := types.NewMockPVWithParams(privKey, breakProposalSigning, breakVoteSigning)
dialerEndpoint := privval.NewSignerDialerEndpoint(
th.logger,
th.chainID,
types.NewMockPVWithParams(privKey, breakProposalSigning, breakVoteSigning),
privval.DialTCPFn(
th.addr,
time.Duration(defaultConnDeadline)*time.Millisecond,
ed25519.GenPrivKey(),
),
)
return privval.NewSignerServer(dialerEndpoint, th.chainID, mockPV)
}
// For running relatively standard tests.
func harnessTest(t *testing.T, rsMaker func(th *TestHarness) *privval.SignerServiceEndpoint, expectedExitCode int) {
func harnessTest(t *testing.T, signerServerMaker func(th *TestHarness) *privval.SignerServer, expectedExitCode int) {
cfg := makeConfig(t, 100, 3)
defer cleanup(cfg)
@@ -148,10 +149,10 @@ func harnessTest(t *testing.T, rsMaker func(th *TestHarness) *privval.SignerServ
th.Run()
}()
rs := rsMaker(th)
require.NoError(t, rs.Start())
assert.True(t, rs.IsRunning())
defer rs.Stop()
ss := signerServerMaker(th)
require.NoError(t, ss.Start())
assert.True(t, ss.IsRunning())
defer ss.Stop()
<-donec
assert.Equal(t, expectedExitCode, th.exitCode)
@@ -159,7 +160,7 @@ func harnessTest(t *testing.T, rsMaker func(th *TestHarness) *privval.SignerServ
func makeConfig(t *testing.T, acceptDeadline, acceptRetries int) TestHarnessConfig {
return TestHarnessConfig{
BindAddr: testFreeTCPAddr(t),
BindAddr: privval.GetFreeLocalhostAddrPort(),
KeyFile: makeTempFile("tm-testharness-keyfile", keyFileContents),
StateFile: makeTempFile("tm-testharness-statefile", stateFileContents),
GenesisFile: makeTempFile("tm-testharness-genesisfile", genesisFileContents),
@@ -191,12 +192,3 @@ func makeTempFile(name, content string) string {
}
return tempFile.Name()
}
// testFreeTCPAddr claims a free port so we don't block on listener being ready.
func testFreeTCPAddr(t *testing.T) string {
ln, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
defer ln.Close()
return fmt.Sprintf("127.0.0.1:%d", ln.Addr().(*net.TCPAddr).Port)
}

View File

@@ -12,6 +12,7 @@ import (
// PrivValidator defines the functionality of a local Tendermint validator
// that signs votes and proposals, and never double signs.
type PrivValidator interface {
// TODO: Extend the interface to return errors too. Issue: https://github.com/tendermint/tendermint/issues/3602
GetPubKey() crypto.PubKey
SignVote(chainID string, vote *Vote) error

View File

@@ -20,7 +20,7 @@ const (
// Must be a string because scripts like dist.sh read this file.
// XXX: Don't change the name of this variable or you will break
// automation :)
TMCoreSemVer = "0.32.1"
TMCoreSemVer = "0.32.2"
// ABCISemVer is the semantic version of the ABCI library
ABCISemVer = "0.16.1"