mirror of
https://github.com/fluencelabs/tendermint
synced 2025-07-30 19:51:58 +00:00
Compare commits
23 Commits
v0.31.2-rc
...
v0.31.4
Author | SHA1 | Date | |
---|---|---|---|
|
4695414393 | ||
|
def5c8cf12 | ||
|
b6da8880c2 | ||
|
a453628c4e | ||
|
4e4224213f | ||
|
b5b3b85697 | ||
|
18d2c45c33 | ||
|
c3df21fe82 | ||
|
bcec8be035 | ||
|
9a415b0572 | ||
|
40da355234 | ||
|
f965a4db15 | ||
|
086d6cbe8c | ||
|
6cc3f4d87c | ||
|
3cfd9757a7 | ||
|
882622ec10 | ||
|
1ecf814838 | ||
|
e4a03f249d | ||
|
56d8aa42b3 | ||
|
79e9f20578 | ||
|
ab24925c94 | ||
|
0ae41cc663 | ||
|
422d04c8ba |
90
CHANGELOG.md
90
CHANGELOG.md
@@ -1,5 +1,95 @@
|
||||
# Changelog
|
||||
|
||||
## v0.31.4
|
||||
|
||||
*April 12th, 2019*
|
||||
|
||||
This release fixes a regression from v0.31.3 which used the peer's `SocketAddr` to add the peer to
|
||||
the address book. This swallowed the peer's self-reported port which is important in case of reconnect.
|
||||
It brings back `NetAddress()` to `NodeInfo` and uses it instead of `SocketAddr` for adding peers.
|
||||
Additionally, it improves response time on the `/validators` or `/status` RPC endpoints.
|
||||
As a side-effect it makes these RPC endpoint more difficult to DoS and fixes a performance degradation in `ExecCommitBlock`.
|
||||
Also, it contains an [ADR](https://github.com/tendermint/tendermint/pull/3539) that proposes decoupling the
|
||||
responsibility for peer behaviour from the `p2p.Switch` (by @brapse).
|
||||
|
||||
Special thanks to external contributors on this release:
|
||||
@brapse, @guagualvcha, @mydring
|
||||
|
||||
### IMPROVEMENTS:
|
||||
- [p2p] [\#3463](https://github.com/tendermint/tendermint/pull/3463) Do not log "Can't add peer's address to addrbook" error for a private peer
|
||||
- [p2p] [\#3547](https://github.com/tendermint/tendermint/pull/3547) Fix a couple of annoying typos (@mdyring)
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [docs] [\#3514](https://github.com/tendermint/tendermint/issues/3514) Fix block.Header.Time description (@melekes)
|
||||
- [p2p] [\#2716](https://github.com/tendermint/tendermint/issues/2716) Check if we're already connected to peer right before dialing it (@melekes)
|
||||
- [p2p] [\#3545](https://github.com/tendermint/tendermint/issues/3545) Add back `NetAddress()` to `NodeInfo` and use it instead of peer's `SocketAddr()` when adding a peer to the `PEXReactor` (potential fix for [\#3532](https://github.com/tendermint/tendermint/issues/3532))
|
||||
- [state] [\#3438](https://github.com/tendermint/tendermint/pull/3438)
|
||||
Persist validators every 100000 blocks even if no changes to the set
|
||||
occurred (@guagualvcha). This
|
||||
1) Prevents possible DoS attack using `/validators` or `/status` RPC
|
||||
endpoints. Before response time was growing linearly with height if no
|
||||
changes were made to the validator set.
|
||||
2) Fixes performance degradation in `ExecCommitBlock` where we call
|
||||
`LoadValidators` for each `Evidence` in the block.
|
||||
|
||||
## v0.31.3
|
||||
|
||||
*April 1st, 2019*
|
||||
|
||||
This release includes two security sensitive fixes: it ensures generated private
|
||||
keys are valid, and it prevents certain DNS lookups that would cause the node to
|
||||
panic if the lookup failed.
|
||||
|
||||
### BREAKING CHANGES:
|
||||
* Go API
|
||||
- [crypto/secp256k1] [\#3439](https://github.com/tendermint/tendermint/issues/3439)
|
||||
The `secp256k1.GenPrivKeySecp256k1` function has changed to guarantee that it returns a valid key, which means it
|
||||
will return a different private key than in previous versions for the same secret.
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [crypto/secp256k1] [\#3439](https://github.com/tendermint/tendermint/issues/3439)
|
||||
Ensure generated private keys are valid by randomly sampling until a valid key is found.
|
||||
Previously, it was possible (though rare!) to generate keys that exceeded the curve order.
|
||||
Such keys would lead to invalid signatures.
|
||||
- [p2p] [\#3522](https://github.com/tendermint/tendermint/issues/3522) Memoize
|
||||
socket address in peer connections to avoid DNS lookups. Previously, failed
|
||||
DNS lookups could cause the node to panic.
|
||||
|
||||
## v0.31.2
|
||||
|
||||
*March 30th, 2019*
|
||||
|
||||
This release fixes a regression from v0.31.1 where Tendermint panics under
|
||||
mempool load for external ABCI apps.
|
||||
|
||||
Special thanks to external contributors on this release:
|
||||
@guagualvcha
|
||||
|
||||
### BREAKING CHANGES:
|
||||
|
||||
* CLI/RPC/Config
|
||||
|
||||
* Apps
|
||||
|
||||
* Go API
|
||||
- [libs/autofile] [\#3504](https://github.com/tendermint/tendermint/issues/3504) Remove unused code in autofile package. Deleted functions: `Group.Search`, `Group.FindLast`, `GroupReader.ReadLine`, `GroupReader.PushLine`, `MakeSimpleSearchFunc` (@guagualvcha)
|
||||
|
||||
* Blockchain Protocol
|
||||
|
||||
* P2P Protocol
|
||||
|
||||
### FEATURES:
|
||||
|
||||
### IMPROVEMENTS:
|
||||
|
||||
- [circle] [\#3497](https://github.com/tendermint/tendermint/issues/3497) Move release management to CircleCI
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [mempool] [\#3512](https://github.com/tendermint/tendermint/issues/3512) Fix panic from concurrent access to txsMap, a regression for external ABCI apps introduced in v0.31.1
|
||||
|
||||
## v0.31.1
|
||||
|
||||
*March 27th, 2019*
|
||||
|
@@ -1,9 +1,6 @@
|
||||
## v0.31.2
|
||||
## v0.31.5
|
||||
|
||||
*March 30th, 2019*
|
||||
|
||||
This release fixes a regression from v0.31.1 where Tendermint panics under
|
||||
mempool load for external ABCI apps.
|
||||
**
|
||||
|
||||
### BREAKING CHANGES:
|
||||
|
||||
@@ -12,7 +9,6 @@ mempool load for external ABCI apps.
|
||||
* Apps
|
||||
|
||||
* Go API
|
||||
- [libs/autofile] \#3504 Remove unused code in autofile package. Deleted functions: `Group.Search`, `Group.FindLast`, `GroupReader.ReadLine`, `GroupReader.PushLine`, `MakeSimpleSearchFunc` (@guagualvcha)
|
||||
|
||||
* Blockchain Protocol
|
||||
|
||||
@@ -22,8 +18,4 @@ mempool load for external ABCI apps.
|
||||
|
||||
### IMPROVEMENTS:
|
||||
|
||||
- [circle] \#3497 Move release management to CircleCI
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [mempool] \#3512 Fix panic from concurrent access to txsMap, a regression for external ABCI apps introduced in v0.31.1
|
||||
|
@@ -210,7 +210,7 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
|
||||
cli.reqSent.Remove(next) // Pop first item from linked list
|
||||
|
||||
// Notify reqRes listener if set (request specific callback).
|
||||
// NOTE: it is possible this callback isn't set on the reqres object
|
||||
// NOTE: it is possible this callback isn't set on the reqres object.
|
||||
// at this point, in which case it will be called after, when it is set.
|
||||
// TODO: should we move this after the resCb call so the order is always consistent?
|
||||
if cb := reqres.GetCallback(); cb != nil {
|
||||
|
@@ -6,6 +6,7 @@ import (
|
||||
"crypto/subtle"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/big"
|
||||
|
||||
"golang.org/x/crypto/ripemd160"
|
||||
|
||||
@@ -65,32 +66,61 @@ func (privKey PrivKeySecp256k1) Equals(other crypto.PrivKey) bool {
|
||||
}
|
||||
|
||||
// GenPrivKey generates a new ECDSA private key on curve secp256k1 private key.
|
||||
// It uses OS randomness in conjunction with the current global random seed
|
||||
// in tendermint/libs/common to generate the private key.
|
||||
// It uses OS randomness to generate the private key.
|
||||
func GenPrivKey() PrivKeySecp256k1 {
|
||||
return genPrivKey(crypto.CReader())
|
||||
}
|
||||
|
||||
// genPrivKey generates a new secp256k1 private key using the provided reader.
|
||||
func genPrivKey(rand io.Reader) PrivKeySecp256k1 {
|
||||
privKeyBytes := [32]byte{}
|
||||
_, err := io.ReadFull(rand, privKeyBytes[:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
var privKeyBytes [32]byte
|
||||
d := new(big.Int)
|
||||
for {
|
||||
privKeyBytes = [32]byte{}
|
||||
_, err := io.ReadFull(rand, privKeyBytes[:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
d.SetBytes(privKeyBytes[:])
|
||||
// break if we found a valid point (i.e. > 0 and < N == curverOrder)
|
||||
isValidFieldElement := 0 < d.Sign() && d.Cmp(secp256k1.S256().N) < 0
|
||||
if isValidFieldElement {
|
||||
break
|
||||
}
|
||||
}
|
||||
// crypto.CRandBytes is guaranteed to be 32 bytes long, so it can be
|
||||
// casted to PrivKeySecp256k1.
|
||||
|
||||
return PrivKeySecp256k1(privKeyBytes)
|
||||
}
|
||||
|
||||
var one = new(big.Int).SetInt64(1)
|
||||
|
||||
// GenPrivKeySecp256k1 hashes the secret with SHA2, and uses
|
||||
// that 32 byte output to create the private key.
|
||||
//
|
||||
// It makes sure the private key is a valid field element by setting:
|
||||
//
|
||||
// c = sha256(secret)
|
||||
// k = (c mod (n − 1)) + 1, where n = curve order.
|
||||
//
|
||||
// NOTE: secret should be the output of a KDF like bcrypt,
|
||||
// if it's derived from user input.
|
||||
func GenPrivKeySecp256k1(secret []byte) PrivKeySecp256k1 {
|
||||
privKey32 := sha256.Sum256(secret)
|
||||
// sha256.Sum256() is guaranteed to be 32 bytes long, so it can be
|
||||
// casted to PrivKeySecp256k1.
|
||||
secHash := sha256.Sum256(secret)
|
||||
// to guarantee that we have a valid field element, we use the approach of:
|
||||
// "Suite B Implementer’s Guide to FIPS 186-3", A.2.1
|
||||
// https://apps.nsa.gov/iaarchive/library/ia-guidance/ia-solutions-for-classified/algorithm-guidance/suite-b-implementers-guide-to-fips-186-3-ecdsa.cfm
|
||||
// see also https://github.com/golang/go/blob/0380c9ad38843d523d9c9804fe300cb7edd7cd3c/src/crypto/ecdsa/ecdsa.go#L89-L101
|
||||
fe := new(big.Int).SetBytes(secHash[:])
|
||||
n := new(big.Int).Sub(secp256k1.S256().N, one)
|
||||
fe.Mod(fe, n)
|
||||
fe.Add(fe, one)
|
||||
|
||||
feB := fe.Bytes()
|
||||
var privKey32 [32]byte
|
||||
// copy feB over to fixed 32 byte privKey32 and pad (if necessary)
|
||||
copy(privKey32[32-len(feB):32], feB)
|
||||
|
||||
return PrivKeySecp256k1(privKey32)
|
||||
}
|
||||
|
||||
|
39
crypto/secp256k1/secp256k1_cgo_test.go
Normal file
39
crypto/secp256k1/secp256k1_cgo_test.go
Normal file
@@ -0,0 +1,39 @@
|
||||
// +build libsecp256k1
|
||||
|
||||
package secp256k1
|
||||
|
||||
import (
|
||||
"github.com/magiconair/properties/assert"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPrivKeySecp256k1SignVerify(t *testing.T) {
|
||||
msg := []byte("A.1.2 ECC Key Pair Generation by Testing Candidates")
|
||||
priv := GenPrivKey()
|
||||
tests := []struct {
|
||||
name string
|
||||
privKey PrivKeySecp256k1
|
||||
wantSignErr bool
|
||||
wantVerifyPasses bool
|
||||
}{
|
||||
{name: "valid sign-verify round", privKey: priv, wantSignErr: false, wantVerifyPasses: true},
|
||||
{name: "invalid private key", privKey: [32]byte{}, wantSignErr: true, wantVerifyPasses: false},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := tt.privKey.Sign(msg)
|
||||
if tt.wantSignErr {
|
||||
require.Error(t, err)
|
||||
t.Logf("Got error: %s", err)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, got)
|
||||
|
||||
pub := tt.privKey.PubKey()
|
||||
assert.Equal(t, tt.wantVerifyPasses, pub.VerifyBytes(msg, got))
|
||||
})
|
||||
}
|
||||
}
|
45
crypto/secp256k1/secp256k1_internal_test.go
Normal file
45
crypto/secp256k1/secp256k1_internal_test.go
Normal file
@@ -0,0 +1,45 @@
|
||||
package secp256k1
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math/big"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
underlyingSecp256k1 "github.com/btcsuite/btcd/btcec"
|
||||
)
|
||||
|
||||
func Test_genPrivKey(t *testing.T) {
|
||||
|
||||
empty := make([]byte, 32)
|
||||
oneB := big.NewInt(1).Bytes()
|
||||
onePadded := make([]byte, 32)
|
||||
copy(onePadded[32-len(oneB):32], oneB)
|
||||
t.Logf("one padded: %v, len=%v", onePadded, len(onePadded))
|
||||
|
||||
validOne := append(empty, onePadded...)
|
||||
tests := []struct {
|
||||
name string
|
||||
notSoRand []byte
|
||||
shouldPanic bool
|
||||
}{
|
||||
{"empty bytes (panics because 1st 32 bytes are zero and 0 is not a valid field element)", empty, true},
|
||||
{"curve order: N", underlyingSecp256k1.S256().N.Bytes(), true},
|
||||
{"valid because 0 < 1 < N", validOne, false},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
if tt.shouldPanic {
|
||||
require.Panics(t, func() {
|
||||
genPrivKey(bytes.NewReader(tt.notSoRand))
|
||||
})
|
||||
return
|
||||
}
|
||||
got := genPrivKey(bytes.NewReader(tt.notSoRand))
|
||||
fe := new(big.Int).SetBytes(got[:])
|
||||
require.True(t, fe.Cmp(underlyingSecp256k1.S256().N) < 0)
|
||||
require.True(t, fe.Sign() > 0)
|
||||
})
|
||||
}
|
||||
}
|
@@ -6,7 +6,6 @@ import (
|
||||
"testing"
|
||||
|
||||
secp256k1 "github.com/btcsuite/btcd/btcec"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@@ -2,6 +2,7 @@ package secp256k1_test
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"math/big"
|
||||
"testing"
|
||||
|
||||
"github.com/btcsuite/btcutil/base58"
|
||||
@@ -84,3 +85,28 @@ func TestSecp256k1LoadPrivkeyAndSerializeIsIdentity(t *testing.T) {
|
||||
require.Equal(t, privKeyBytes[:], serializedBytes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGenPrivKeySecp256k1(t *testing.T) {
|
||||
// curve oder N
|
||||
N := underlyingSecp256k1.S256().N
|
||||
tests := []struct {
|
||||
name string
|
||||
secret []byte
|
||||
}{
|
||||
{"empty secret", []byte{}},
|
||||
{"some long secret", []byte("We live in a society exquisitely dependent on science and technology, in which hardly anyone knows anything about science and technology.")},
|
||||
{"another seed used in cosmos tests #1", []byte{0}},
|
||||
{"another seed used in cosmos tests #2", []byte("mySecret")},
|
||||
{"another seed used in cosmos tests #3", []byte("")},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
gotPrivKey := secp256k1.GenPrivKeySecp256k1(tt.secret)
|
||||
require.NotNil(t, gotPrivKey)
|
||||
// interpret as a big.Int and make sure it is a valid field element:
|
||||
fe := new(big.Int).SetBytes(gotPrivKey[:])
|
||||
require.True(t, fe.Cmp(N) < 0)
|
||||
require.True(t, fe.Sign() > 0)
|
||||
})
|
||||
}
|
||||
}
|
145
docs/architecture/adr-037-peer-behaviour.md
Normal file
145
docs/architecture/adr-037-peer-behaviour.md
Normal file
@@ -0,0 +1,145 @@
|
||||
# ADR 037: Peer Behaviour Interface
|
||||
|
||||
## Changelog
|
||||
* 07-03-2019: Initial draft
|
||||
|
||||
## Context
|
||||
|
||||
The responsibility for signaling and acting upon peer behaviour lacks a single
|
||||
owning component and is heavily coupled with the network stack[<sup>1</sup>](#references). Reactors
|
||||
maintain a reference to the `p2p.Switch` which they use to call
|
||||
`switch.StopPeerForError(...)` when a peer misbehaves and
|
||||
`switch.MarkAsGood(...)` when a peer contributes in some meaningful way.
|
||||
While the switch handles `StopPeerForError` internally, the `MarkAsGood`
|
||||
method delegates to another component, `p2p.AddrBook`. This scheme of delegation
|
||||
across Switch obscures the responsibility for handling peer behaviour
|
||||
and ties up the reactors in a larger dependency graph when testing.
|
||||
|
||||
## Decision
|
||||
|
||||
Introduce a `PeerBehaviour` interface and concrete implementations which
|
||||
provide methods for reactors to signal peer behaviour without direct
|
||||
coupling `p2p.Switch`. Introduce a ErrPeer to provide
|
||||
concrete reasons for stopping peers.
|
||||
|
||||
### Implementation Changes
|
||||
|
||||
PeerBehaviour then becomes an interface for signaling peer errors as well
|
||||
as for marking peers as `good`.
|
||||
|
||||
XXX: It might be better to pass p2p.ID instead of the whole peer but as
|
||||
a first draft maintain the underlying implementation as much as
|
||||
possible.
|
||||
|
||||
```go
|
||||
type PeerBehaviour interface {
|
||||
Errored(peer Peer, reason ErrPeer)
|
||||
MarkPeerAsGood(peer Peer)
|
||||
}
|
||||
```
|
||||
|
||||
Instead of signaling peers to stop with arbitrary reasons:
|
||||
`reason interface{}`
|
||||
|
||||
We introduce a concrete error type ErrPeer:
|
||||
```go
|
||||
type ErrPeer int
|
||||
|
||||
const (
|
||||
ErrPeerUnknown = iota
|
||||
ErrPeerBadMessage
|
||||
ErrPeerMessageOutofOrder
|
||||
...
|
||||
)
|
||||
```
|
||||
|
||||
As a first iteration we provide a concrete implementation which wraps
|
||||
the switch:
|
||||
```go
|
||||
type SwitchedPeerBehaviour struct {
|
||||
sw *Switch
|
||||
}
|
||||
|
||||
func (spb *SwitchedPeerBehaviour) Errored(peer Peer, reason ErrPeer) {
|
||||
spb.sw.StopPeerForError(peer, reason)
|
||||
}
|
||||
|
||||
func (spb *SwitchedPeerBehaviour) MarkPeerAsGood(peer Peer) {
|
||||
spb.sw.MarkPeerAsGood(peer)
|
||||
}
|
||||
|
||||
func NewSwitchedPeerBehaviour(sw *Switch) *SwitchedPeerBehaviour {
|
||||
return &SwitchedPeerBehaviour{
|
||||
sw: sw,
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Reactors, which are often difficult to unit test[<sup>2</sup>](#references). could use an implementation which exposes the signals produced by the reactor in
|
||||
manufactured scenarios:
|
||||
|
||||
```go
|
||||
type PeerErrors map[Peer][]ErrPeer
|
||||
type GoodPeers map[Peer]bool
|
||||
|
||||
type StorePeerBehaviour struct {
|
||||
pe PeerErrors
|
||||
gp GoodPeers
|
||||
}
|
||||
|
||||
func NewStorePeerBehaviour() *StorePeerBehaviour{
|
||||
return &StorePeerBehaviour{
|
||||
pe: make(PeerErrors),
|
||||
gp: GoodPeers{},
|
||||
}
|
||||
}
|
||||
|
||||
func (spb StorePeerBehaviour) Errored(peer Peer, reason ErrPeer) {
|
||||
if _, ok := spb.pe[peer]; !ok {
|
||||
spb.pe[peer] = []ErrPeer{reason}
|
||||
} else {
|
||||
spb.pe[peer] = append(spb.pe[peer], reason)
|
||||
}
|
||||
}
|
||||
|
||||
func (mpb *StorePeerBehaviour) GetPeerErrors() PeerErrors {
|
||||
return mpb.pe
|
||||
}
|
||||
|
||||
func (spb *StorePeerBehaviour) MarkPeerAsGood(peer Peer) {
|
||||
if _, ok := spb.gp[peer]; !ok {
|
||||
spb.gp[peer] = true
|
||||
}
|
||||
}
|
||||
|
||||
func (spb *StorePeerBehaviour) GetGoodPeers() GoodPeers {
|
||||
return spb.gp
|
||||
}
|
||||
```
|
||||
|
||||
## Status
|
||||
|
||||
Proposed
|
||||
|
||||
## Consequences
|
||||
|
||||
### Positive
|
||||
|
||||
* De-couple signaling from acting upon peer behaviour.
|
||||
* Reduce the coupling of reactors and the Switch and the network
|
||||
stack
|
||||
* The responsibility of managing peer behaviour can be migrated to
|
||||
a single component instead of split between the switch and the
|
||||
address book.
|
||||
|
||||
### Negative
|
||||
|
||||
* The first iteration will simply wrap the Switch and introduce a
|
||||
level of indirection.
|
||||
|
||||
### Neutral
|
||||
|
||||
## References
|
||||
|
||||
1. Issue [#2067](https://github.com/tendermint/tendermint/issues/2067): P2P Refactor
|
||||
2. PR: [#3506](https://github.com/tendermint/tendermint/pull/3506): ADR 036: Blockchain Reactor Refactor
|
@@ -347,8 +347,10 @@ Commit are included in the header of the next block.
|
||||
- `Version (Version)`: Version of the blockchain and the application
|
||||
- `ChainID (string)`: ID of the blockchain
|
||||
- `Height (int64)`: Height of the block in the chain
|
||||
- `Time (google.protobuf.Timestamp)`: Time of the block. It is the proposer's
|
||||
local time when block was created.
|
||||
- `Time (google.protobuf.Timestamp)`: Time of the previous block.
|
||||
For heights > 1, it's the weighted median of the timestamps of the valid
|
||||
votes in the block.LastCommit.
|
||||
For height == 1, it's genesis time.
|
||||
- `NumTxs (int32)`: Number of transactions in the block
|
||||
- `TotalTxs (int64)`: Total number of transactions in the blockchain until
|
||||
now
|
||||
|
@@ -31,8 +31,13 @@ states to the latest committed state at once.
|
||||
|
||||
When `Commit` completes, it unlocks the mempool.
|
||||
|
||||
Note that it is not possible to send transactions to Tendermint during `Commit` - if your app
|
||||
tries to send a `/broadcast_tx` to Tendermint during Commit, it will deadlock.
|
||||
WARNING: if the ABCI app logic processing the `Commit` message sends a
|
||||
`/broadcast_tx_sync` or `/broadcast_tx_commit` and waits for the response
|
||||
before proceeding, it will deadlock. Executing those `broadcast_tx` calls
|
||||
involves acquiring a lock that is held during the `Commit` call, so it's not
|
||||
possible. If you make the call to the `broadcast_tx` endpoints concurrently,
|
||||
that's no problem, it just can't be part of the sequential logic of the
|
||||
`Commit` function.
|
||||
|
||||
### Consensus Connection
|
||||
|
||||
|
@@ -244,7 +244,7 @@ The height is an incrementing integer. The first block has `block.Header.Height
|
||||
### Time
|
||||
|
||||
```
|
||||
block.Header.Timestamp >= prevBlock.Header.Timestamp + 1 ms
|
||||
block.Header.Timestamp >= prevBlock.Header.Timestamp + state.consensusParams.Block.TimeIotaMs
|
||||
block.Header.Timestamp == MedianTime(block.LastCommit, state.LastValidators)
|
||||
```
|
||||
|
||||
|
@@ -19,7 +19,7 @@ func TestCacheRemove(t *testing.T) {
|
||||
for i := 0; i < numTxs; i++ {
|
||||
// probability of collision is 2**-256
|
||||
txBytes := make([]byte, 32)
|
||||
rand.Read(txBytes)
|
||||
rand.Read(txBytes) // nolint: gosec
|
||||
txs[i] = txBytes
|
||||
cache.Push(txBytes)
|
||||
// make sure its added to both the linked list and the map
|
||||
|
@@ -167,8 +167,8 @@ type Mempool struct {
|
||||
preCheck PreCheckFunc
|
||||
postCheck PostCheckFunc
|
||||
|
||||
// track whether we're rechecking txs.
|
||||
// these are not protected by a mutex and are expected to be mutated
|
||||
// Track whether we're rechecking txs.
|
||||
// These are not protected by a mutex and are expected to be mutated
|
||||
// in serial (ie. by abci responses which are called in serial).
|
||||
recheckCursor *clist.CElement // next expected response
|
||||
recheckEnd *clist.CElement // re-checking stops here
|
||||
@@ -177,7 +177,7 @@ type Mempool struct {
|
||||
notifiedTxsAvailable bool
|
||||
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
|
||||
|
||||
// map for quick access to txs to record sender in CheckTx.
|
||||
// Map for quick access to txs to record sender in CheckTx.
|
||||
// txsMap: txKey -> CElement
|
||||
txsMap sync.Map
|
||||
|
||||
@@ -461,11 +461,8 @@ func (mem *Mempool) globalCb(req *abci.Request, res *abci.Response) {
|
||||
func (mem *Mempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) {
|
||||
return func(res *abci.Response) {
|
||||
if mem.recheckCursor != nil {
|
||||
// this should not be possible.
|
||||
// rechecks should only happen during Update
|
||||
// after all checktxs were flushed and before
|
||||
// any new ones happened.
|
||||
return
|
||||
// this should never happen
|
||||
panic("recheck cursor is not nil in reqResCb")
|
||||
}
|
||||
|
||||
mem.resCbFirstTime(tx, peerID, res)
|
||||
|
@@ -489,7 +489,7 @@ func NewNode(config *cfg.Config,
|
||||
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
|
||||
|
||||
// Add ourselves to addrbook to prevent dialing ourselves
|
||||
addrBook.AddOurAddress(nodeInfo.NetAddress())
|
||||
addrBook.AddOurAddress(sw.NetAddress())
|
||||
|
||||
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
|
||||
if config.P2P.PexReactor {
|
||||
@@ -498,6 +498,12 @@ func NewNode(config *cfg.Config,
|
||||
&pex.PEXReactorConfig{
|
||||
Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
|
||||
SeedMode: config.P2P.SeedMode,
|
||||
// See consensus/reactor.go: blocksToContributeToBecomeGoodPeer 10000
|
||||
// blocks assuming 10s blocks ~ 28 hours.
|
||||
// TODO (melekes): make it dynamic based on the actual block latencies
|
||||
// from the live network.
|
||||
// https://github.com/tendermint/tendermint/issues/3523
|
||||
SeedDisconnectWaitPeriod: 28 * time.Hour,
|
||||
})
|
||||
pexReactor.SetLogger(logger.With("module", "pex"))
|
||||
sw.AddReactor("PEX", pexReactor)
|
||||
|
@@ -103,7 +103,7 @@ type ErrSwitchDuplicatePeerID struct {
|
||||
}
|
||||
|
||||
func (e ErrSwitchDuplicatePeerID) Error() string {
|
||||
return fmt.Sprintf("Duplicate peer ID %v", e.ID)
|
||||
return fmt.Sprintf("duplicate peer ID %v", e.ID)
|
||||
}
|
||||
|
||||
// ErrSwitchDuplicatePeerIP to be raised whena a peer is connecting with a known
|
||||
@@ -113,7 +113,7 @@ type ErrSwitchDuplicatePeerIP struct {
|
||||
}
|
||||
|
||||
func (e ErrSwitchDuplicatePeerIP) Error() string {
|
||||
return fmt.Sprintf("Duplicate peer IP %v", e.IP.String())
|
||||
return fmt.Sprintf("duplicate peer IP %v", e.IP.String())
|
||||
}
|
||||
|
||||
// ErrSwitchConnectToSelf to be raised when trying to connect to itself.
|
||||
@@ -122,7 +122,7 @@ type ErrSwitchConnectToSelf struct {
|
||||
}
|
||||
|
||||
func (e ErrSwitchConnectToSelf) Error() string {
|
||||
return fmt.Sprintf("Connect to self: %v", e.Addr)
|
||||
return fmt.Sprintf("connect to self: %v", e.Addr)
|
||||
}
|
||||
|
||||
type ErrSwitchAuthenticationFailure struct {
|
||||
@@ -132,7 +132,7 @@ type ErrSwitchAuthenticationFailure struct {
|
||||
|
||||
func (e ErrSwitchAuthenticationFailure) Error() string {
|
||||
return fmt.Sprintf(
|
||||
"Failed to authenticate peer. Dialed %v, but got peer with ID %s",
|
||||
"failed to authenticate peer. Dialed %v, but got peer with ID %s",
|
||||
e.Dialed,
|
||||
e.Got,
|
||||
)
|
||||
@@ -152,7 +152,7 @@ type ErrNetAddressNoID struct {
|
||||
}
|
||||
|
||||
func (e ErrNetAddressNoID) Error() string {
|
||||
return fmt.Sprintf("Address (%s) does not contain ID", e.Addr)
|
||||
return fmt.Sprintf("address (%s) does not contain ID", e.Addr)
|
||||
}
|
||||
|
||||
type ErrNetAddressInvalid struct {
|
||||
@@ -161,7 +161,7 @@ type ErrNetAddressInvalid struct {
|
||||
}
|
||||
|
||||
func (e ErrNetAddressInvalid) Error() string {
|
||||
return fmt.Sprintf("Invalid address (%s): %v", e.Addr, e.Err)
|
||||
return fmt.Sprintf("invalid address (%s): %v", e.Addr, e.Err)
|
||||
}
|
||||
|
||||
type ErrNetAddressLookup struct {
|
||||
@@ -170,5 +170,15 @@ type ErrNetAddressLookup struct {
|
||||
}
|
||||
|
||||
func (e ErrNetAddressLookup) Error() string {
|
||||
return fmt.Sprintf("Error looking up host (%s): %v", e.Addr, e.Err)
|
||||
return fmt.Sprintf("error looking up host (%s): %v", e.Addr, e.Err)
|
||||
}
|
||||
|
||||
// ErrCurrentlyDialingOrExistingAddress indicates that we're currently
|
||||
// dialing this address or it belongs to an existing peer.
|
||||
type ErrCurrentlyDialingOrExistingAddress struct {
|
||||
Addr string
|
||||
}
|
||||
|
||||
func (e ErrCurrentlyDialingOrExistingAddress) Error() string {
|
||||
return fmt.Sprintf("connection with %s has been established or dialed", e.Addr)
|
||||
}
|
||||
|
@@ -62,7 +62,7 @@ func (mp *Peer) Get(key string) interface{} {
|
||||
func (mp *Peer) Set(key string, value interface{}) {
|
||||
mp.kv[key] = value
|
||||
}
|
||||
func (mp *Peer) RemoteIP() net.IP { return mp.ip }
|
||||
func (mp *Peer) OriginalAddr() *p2p.NetAddress { return mp.addr }
|
||||
func (mp *Peer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
|
||||
func (mp *Peer) CloseConn() error { return nil }
|
||||
func (mp *Peer) RemoteIP() net.IP { return mp.ip }
|
||||
func (mp *Peer) SocketAddr() *p2p.NetAddress { return mp.addr }
|
||||
func (mp *Peer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
|
||||
func (mp *Peer) CloseConn() error { return nil }
|
||||
|
@@ -23,14 +23,13 @@ func MaxNodeInfoSize() int {
|
||||
// NodeInfo exposes basic info of a node
|
||||
// and determines if we're compatible.
|
||||
type NodeInfo interface {
|
||||
ID() ID
|
||||
nodeInfoAddress
|
||||
nodeInfoTransport
|
||||
}
|
||||
|
||||
// nodeInfoAddress exposes just the core info of a node.
|
||||
type nodeInfoAddress interface {
|
||||
ID() ID
|
||||
NetAddress() *NetAddress
|
||||
NetAddress() (*NetAddress, error)
|
||||
}
|
||||
|
||||
// nodeInfoTransport validates a nodeInfo and checks
|
||||
@@ -215,20 +214,9 @@ OUTER_LOOP:
|
||||
// it includes the authenticated peer ID and the self-reported
|
||||
// ListenAddr. Note that the ListenAddr is not authenticated and
|
||||
// may not match that address actually dialed if its an outbound peer.
|
||||
func (info DefaultNodeInfo) NetAddress() *NetAddress {
|
||||
func (info DefaultNodeInfo) NetAddress() (*NetAddress, error) {
|
||||
idAddr := IDAddressString(info.ID(), info.ListenAddr)
|
||||
netAddr, err := NewNetAddressString(idAddr)
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case ErrNetAddressLookup:
|
||||
// XXX If the peer provided a host name and the lookup fails here
|
||||
// we're out of luck.
|
||||
// TODO: use a NetAddress in DefaultNodeInfo
|
||||
default:
|
||||
panic(err) // everything should be well formed by now
|
||||
}
|
||||
}
|
||||
return netAddr
|
||||
return NewNetAddressString(idAddr)
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------
|
||||
|
27
p2p/peer.go
27
p2p/peer.go
@@ -29,7 +29,7 @@ type Peer interface {
|
||||
|
||||
NodeInfo() NodeInfo // peer's info
|
||||
Status() tmconn.ConnectionStatus
|
||||
OriginalAddr() *NetAddress // original address for outbound peers
|
||||
SocketAddr() *NetAddress // actual address of the socket
|
||||
|
||||
Send(byte, []byte) bool
|
||||
TrySend(byte, []byte) bool
|
||||
@@ -46,7 +46,7 @@ type peerConn struct {
|
||||
persistent bool
|
||||
conn net.Conn // source connection
|
||||
|
||||
originalAddr *NetAddress // nil for inbound connections
|
||||
socketAddr *NetAddress
|
||||
|
||||
// cached RemoteIP()
|
||||
ip net.IP
|
||||
@@ -55,14 +55,14 @@ type peerConn struct {
|
||||
func newPeerConn(
|
||||
outbound, persistent bool,
|
||||
conn net.Conn,
|
||||
originalAddr *NetAddress,
|
||||
socketAddr *NetAddress,
|
||||
) peerConn {
|
||||
|
||||
return peerConn{
|
||||
outbound: outbound,
|
||||
persistent: persistent,
|
||||
conn: conn,
|
||||
originalAddr: originalAddr,
|
||||
outbound: outbound,
|
||||
persistent: persistent,
|
||||
conn: conn,
|
||||
socketAddr: socketAddr,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -223,13 +223,12 @@ func (p *peer) NodeInfo() NodeInfo {
|
||||
return p.nodeInfo
|
||||
}
|
||||
|
||||
// OriginalAddr returns the original address, which was used to connect with
|
||||
// the peer. Returns nil for inbound peers.
|
||||
func (p *peer) OriginalAddr() *NetAddress {
|
||||
if p.peerConn.outbound {
|
||||
return p.peerConn.originalAddr
|
||||
}
|
||||
return nil
|
||||
// SocketAddr returns the address of the socket.
|
||||
// For outbound peers, it's the address dialed (after DNS resolution).
|
||||
// For inbound peers, it's the address returned by the underlying connection
|
||||
// (not what's reported in the peer's NodeInfo).
|
||||
func (p *peer) SocketAddr() *NetAddress {
|
||||
return p.peerConn.socketAddr
|
||||
}
|
||||
|
||||
// Status returns the peer's ConnectionStatus.
|
||||
|
@@ -29,7 +29,7 @@ func (mp *mockPeer) IsPersistent() bool { return true }
|
||||
func (mp *mockPeer) Get(s string) interface{} { return s }
|
||||
func (mp *mockPeer) Set(string, interface{}) {}
|
||||
func (mp *mockPeer) RemoteIP() net.IP { return mp.ip }
|
||||
func (mp *mockPeer) OriginalAddr() *NetAddress { return nil }
|
||||
func (mp *mockPeer) SocketAddr() *NetAddress { return nil }
|
||||
func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} }
|
||||
func (mp *mockPeer) CloseConn() error { return nil }
|
||||
|
||||
|
@@ -109,25 +109,27 @@ func testOutboundPeerConn(
|
||||
persistent bool,
|
||||
ourNodePrivKey crypto.PrivKey,
|
||||
) (peerConn, error) {
|
||||
|
||||
var pc peerConn
|
||||
conn, err := testDial(addr, config)
|
||||
if err != nil {
|
||||
return peerConn{}, cmn.ErrorWrap(err, "Error creating peer")
|
||||
return pc, cmn.ErrorWrap(err, "Error creating peer")
|
||||
}
|
||||
|
||||
pc, err := testPeerConn(conn, config, true, persistent, ourNodePrivKey, addr)
|
||||
pc, err = testPeerConn(conn, config, true, persistent, ourNodePrivKey, addr)
|
||||
if err != nil {
|
||||
if cerr := conn.Close(); cerr != nil {
|
||||
return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
|
||||
return pc, cmn.ErrorWrap(err, cerr.Error())
|
||||
}
|
||||
return peerConn{}, err
|
||||
return pc, err
|
||||
}
|
||||
|
||||
// ensure dialed ID matches connection ID
|
||||
if addr.ID != pc.ID() {
|
||||
if cerr := conn.Close(); cerr != nil {
|
||||
return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
|
||||
return pc, cmn.ErrorWrap(err, cerr.Error())
|
||||
}
|
||||
return peerConn{}, ErrSwitchAuthenticationFailure{addr, pc.ID()}
|
||||
return pc, ErrSwitchAuthenticationFailure{addr, pc.ID()}
|
||||
}
|
||||
|
||||
return pc, nil
|
||||
|
@@ -55,7 +55,7 @@ type AddrBook interface {
|
||||
PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress
|
||||
|
||||
// Mark address
|
||||
MarkGood(*p2p.NetAddress)
|
||||
MarkGood(p2p.ID)
|
||||
MarkAttempt(*p2p.NetAddress)
|
||||
MarkBad(*p2p.NetAddress)
|
||||
|
||||
@@ -66,8 +66,7 @@ type AddrBook interface {
|
||||
// Send a selection of addresses with bias
|
||||
GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddress
|
||||
|
||||
// TODO: remove
|
||||
ListOfKnownAddresses() []*knownAddress
|
||||
Size() int
|
||||
|
||||
// Persist to disk
|
||||
Save()
|
||||
@@ -254,7 +253,7 @@ func (a *addrBook) PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress {
|
||||
bookSize := a.size()
|
||||
if bookSize <= 0 {
|
||||
if bookSize < 0 {
|
||||
a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld)
|
||||
panic(fmt.Sprintf("Addrbook size %d (new: %d + old: %d) is less than 0", a.nNew+a.nOld, a.nNew, a.nOld))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -297,11 +296,11 @@ func (a *addrBook) PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress {
|
||||
|
||||
// MarkGood implements AddrBook - it marks the peer as good and
|
||||
// moves it into an "old" bucket.
|
||||
func (a *addrBook) MarkGood(addr *p2p.NetAddress) {
|
||||
func (a *addrBook) MarkGood(id p2p.ID) {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
ka := a.addrLookup[addr.ID]
|
||||
ka := a.addrLookup[id]
|
||||
if ka == nil {
|
||||
return
|
||||
}
|
||||
@@ -339,7 +338,7 @@ func (a *addrBook) GetSelection() []*p2p.NetAddress {
|
||||
bookSize := a.size()
|
||||
if bookSize <= 0 {
|
||||
if bookSize < 0 {
|
||||
a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld)
|
||||
panic(fmt.Sprintf("Addrbook size %d (new: %d + old: %d) is less than 0", a.nNew+a.nOld, a.nNew, a.nOld))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -389,7 +388,7 @@ func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddre
|
||||
bookSize := a.size()
|
||||
if bookSize <= 0 {
|
||||
if bookSize < 0 {
|
||||
a.Logger.Error("Addrbook size less than 0", "nNew", a.nNew, "nOld", a.nOld)
|
||||
panic(fmt.Sprintf("Addrbook size %d (new: %d + old: %d) is less than 0", a.nNew+a.nOld, a.nNew, a.nOld))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -414,18 +413,6 @@ func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddre
|
||||
return selection
|
||||
}
|
||||
|
||||
// ListOfKnownAddresses returns the new and old addresses.
|
||||
func (a *addrBook) ListOfKnownAddresses() []*knownAddress {
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
|
||||
addrs := []*knownAddress{}
|
||||
for _, addr := range a.addrLookup {
|
||||
addrs = append(addrs, addr.copy())
|
||||
}
|
||||
return addrs
|
||||
}
|
||||
|
||||
//------------------------------------------------
|
||||
|
||||
// Size returns the number of addresses in the book.
|
||||
@@ -473,8 +460,7 @@ func (a *addrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAd
|
||||
case bucketTypeOld:
|
||||
return a.bucketsOld[bucketIdx]
|
||||
default:
|
||||
cmn.PanicSanity("Should not happen")
|
||||
return nil
|
||||
panic("Invalid bucket type")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -600,23 +586,10 @@ func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error {
|
||||
return ErrAddrBookNilAddr{addr, src}
|
||||
}
|
||||
|
||||
if a.routabilityStrict && !addr.Routable() {
|
||||
return ErrAddrBookNonRoutable{addr}
|
||||
}
|
||||
|
||||
if !addr.Valid() {
|
||||
return ErrAddrBookInvalidAddr{addr}
|
||||
}
|
||||
|
||||
if !addr.HasID() {
|
||||
return ErrAddrBookInvalidAddrNoID{addr}
|
||||
}
|
||||
|
||||
// TODO: we should track ourAddrs by ID and by IP:PORT and refuse both.
|
||||
if _, ok := a.ourAddrs[addr.String()]; ok {
|
||||
return ErrAddrBookSelf{addr}
|
||||
}
|
||||
|
||||
if _, ok := a.privateIDs[addr.ID]; ok {
|
||||
return ErrAddrBookPrivate{addr}
|
||||
}
|
||||
@@ -625,6 +598,19 @@ func (a *addrBook) addAddress(addr, src *p2p.NetAddress) error {
|
||||
return ErrAddrBookPrivateSrc{src}
|
||||
}
|
||||
|
||||
// TODO: we should track ourAddrs by ID and by IP:PORT and refuse both.
|
||||
if _, ok := a.ourAddrs[addr.String()]; ok {
|
||||
return ErrAddrBookSelf{addr}
|
||||
}
|
||||
|
||||
if a.routabilityStrict && !addr.Routable() {
|
||||
return ErrAddrBookNonRoutable{addr}
|
||||
}
|
||||
|
||||
if !addr.Valid() {
|
||||
return ErrAddrBookInvalidAddr{addr}
|
||||
}
|
||||
|
||||
ka := a.addrLookup[addr.ID]
|
||||
if ka != nil {
|
||||
// If its already old and the addr is the same, ignore it.
|
||||
|
@@ -41,7 +41,7 @@ func TestAddrBookPickAddress(t *testing.T) {
|
||||
assert.NotNil(t, addr, "expected an address")
|
||||
|
||||
// pick an address when we only have old address
|
||||
book.MarkGood(addrSrc.addr)
|
||||
book.MarkGood(addrSrc.addr.ID)
|
||||
addr = book.PickAddress(0)
|
||||
assert.NotNil(t, addr, "expected an address")
|
||||
addr = book.PickAddress(50)
|
||||
@@ -126,7 +126,7 @@ func TestAddrBookPromoteToOld(t *testing.T) {
|
||||
// Promote half of them
|
||||
for i, addrSrc := range randAddrs {
|
||||
if i%2 == 0 {
|
||||
book.MarkGood(addrSrc.addr)
|
||||
book.MarkGood(addrSrc.addr.ID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -330,7 +330,7 @@ func TestAddrBookGetSelectionWithBias(t *testing.T) {
|
||||
randAddrsLen := len(randAddrs)
|
||||
for i, addrSrc := range randAddrs {
|
||||
if int((float64(i)/float64(randAddrsLen))*100) >= 20 {
|
||||
book.MarkGood(addrSrc.addr)
|
||||
book.MarkGood(addrSrc.addr.ID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -569,7 +569,7 @@ func createAddrBookWithMOldAndNNewAddrs(t *testing.T, nOld, nNew int) (book *add
|
||||
randAddrs := randNetAddressPairs(t, nOld)
|
||||
for _, addr := range randAddrs {
|
||||
book.AddAddress(addr.addr, addr.src)
|
||||
book.MarkGood(addr.addr)
|
||||
book.MarkGood(addr.addr.ID)
|
||||
}
|
||||
|
||||
randAddrs = randNetAddressPairs(t, nNew)
|
||||
|
@@ -30,6 +30,10 @@ func (err ErrAddrBookPrivate) Error() string {
|
||||
return fmt.Sprintf("Cannot add private peer with address %v", err.Addr)
|
||||
}
|
||||
|
||||
func (err ErrAddrBookPrivate) PrivateAddr() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type ErrAddrBookPrivateSrc struct {
|
||||
Src *p2p.NetAddress
|
||||
}
|
||||
@@ -38,6 +42,10 @@ func (err ErrAddrBookPrivateSrc) Error() string {
|
||||
return fmt.Sprintf("Cannot add peer coming from private peer with address %v", err.Src)
|
||||
}
|
||||
|
||||
func (err ErrAddrBookPrivateSrc) PrivateAddr() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type ErrAddrBookNilAddr struct {
|
||||
Addr *p2p.NetAddress
|
||||
Src *p2p.NetAddress
|
||||
|
@@ -16,16 +16,15 @@ type addrBookJSON struct {
|
||||
}
|
||||
|
||||
func (a *addrBook) saveToFile(filePath string) {
|
||||
a.Logger.Info("Saving AddrBook to file", "size", a.Size())
|
||||
|
||||
a.mtx.Lock()
|
||||
defer a.mtx.Unlock()
|
||||
// Compile Addrs
|
||||
addrs := []*knownAddress{}
|
||||
|
||||
a.Logger.Info("Saving AddrBook to file", "size", a.size())
|
||||
|
||||
addrs := make([]*knownAddress, 0, len(a.addrLookup))
|
||||
for _, ka := range a.addrLookup {
|
||||
addrs = append(addrs, ka)
|
||||
}
|
||||
|
||||
aJSON := &addrBookJSON{
|
||||
Key: a.key,
|
||||
Addrs: addrs,
|
||||
|
@@ -33,18 +33,6 @@ func (ka *knownAddress) ID() p2p.ID {
|
||||
return ka.Addr.ID
|
||||
}
|
||||
|
||||
func (ka *knownAddress) copy() *knownAddress {
|
||||
return &knownAddress{
|
||||
Addr: ka.Addr,
|
||||
Src: ka.Src,
|
||||
Attempts: ka.Attempts,
|
||||
LastAttempt: ka.LastAttempt,
|
||||
LastSuccess: ka.LastSuccess,
|
||||
BucketType: ka.BucketType,
|
||||
Buckets: ka.Buckets,
|
||||
}
|
||||
}
|
||||
|
||||
func (ka *knownAddress) isOld() bool {
|
||||
return ka.BucketType == bucketTypeOld
|
||||
}
|
||||
|
@@ -3,7 +3,6 @@ package pex
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -35,16 +34,11 @@ const (
|
||||
|
||||
// Seed/Crawler constants
|
||||
|
||||
// We want seeds to only advertise good peers. Therefore they should wait at
|
||||
// least as long as we expect it to take for a peer to become good before
|
||||
// disconnecting.
|
||||
// see consensus/reactor.go: blocksToContributeToBecomeGoodPeer
|
||||
// 10000 blocks assuming 1s blocks ~ 2.7 hours.
|
||||
defaultSeedDisconnectWaitPeriod = 3 * time.Hour
|
||||
// minTimeBetweenCrawls is a minimum time between attempts to crawl a peer.
|
||||
minTimeBetweenCrawls = 2 * time.Minute
|
||||
|
||||
defaultCrawlPeerInterval = 2 * time.Minute // don't redial for this. TODO: back-off. what for?
|
||||
|
||||
defaultCrawlPeersPeriod = 30 * time.Second // check some peers every this
|
||||
// check some peers every this
|
||||
crawlPeerPeriod = 30 * time.Second
|
||||
|
||||
maxAttemptsToDial = 16 // ~ 35h in total (last attempt - 18h)
|
||||
|
||||
@@ -77,6 +71,9 @@ type PEXReactor struct {
|
||||
seedAddrs []*p2p.NetAddress
|
||||
|
||||
attemptsToDial sync.Map // address (string) -> {number of attempts (int), last time dialed (time.Time)}
|
||||
|
||||
// seed/crawled mode fields
|
||||
crawlPeerInfos map[p2p.ID]crawlPeerInfo
|
||||
}
|
||||
|
||||
func (r *PEXReactor) minReceiveRequestInterval() time.Duration {
|
||||
@@ -90,6 +87,11 @@ type PEXReactorConfig struct {
|
||||
// Seed/Crawler mode
|
||||
SeedMode bool
|
||||
|
||||
// We want seeds to only advertise good peers. Therefore they should wait at
|
||||
// least as long as we expect it to take for a peer to become good before
|
||||
// disconnecting.
|
||||
SeedDisconnectWaitPeriod time.Duration
|
||||
|
||||
// Seeds is a list of addresses reactor may use
|
||||
// if it can't connect to peers in the addrbook.
|
||||
Seeds []string
|
||||
@@ -108,6 +110,7 @@ func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor {
|
||||
ensurePeersPeriod: defaultEnsurePeersPeriod,
|
||||
requestsSent: cmn.NewCMap(),
|
||||
lastReceivedRequests: cmn.NewCMap(),
|
||||
crawlPeerInfos: make(map[p2p.ID]crawlPeerInfo),
|
||||
}
|
||||
r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
|
||||
return r
|
||||
@@ -167,12 +170,18 @@ func (r *PEXReactor) AddPeer(p Peer) {
|
||||
}
|
||||
} else {
|
||||
// inbound peer is its own source
|
||||
addr := p.NodeInfo().NetAddress()
|
||||
addr, err := p.NodeInfo().NetAddress()
|
||||
if err != nil {
|
||||
r.Logger.Error("Failed to get peer NetAddress", "err", err, "peer", p)
|
||||
return
|
||||
}
|
||||
|
||||
// Make it explicit that addr and src are the same for an inbound peer.
|
||||
src := addr
|
||||
|
||||
// add to book. dont RequestAddrs right away because
|
||||
// we don't trust inbound as much - let ensurePeersRoutine handle it.
|
||||
err := r.book.AddAddress(addr, src)
|
||||
err = r.book.AddAddress(addr, src)
|
||||
r.logErrAddrBook(err)
|
||||
}
|
||||
}
|
||||
@@ -309,7 +318,10 @@ func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error {
|
||||
}
|
||||
r.requestsSent.Delete(id)
|
||||
|
||||
srcAddr := src.NodeInfo().NetAddress()
|
||||
srcAddr, err := src.NodeInfo().NetAddress()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, netAddr := range addrs {
|
||||
// Validate netAddr. Disconnect from a peer if it sends us invalid data.
|
||||
if netAddr == nil {
|
||||
@@ -363,9 +375,9 @@ func (r *PEXReactor) ensurePeersRoutine() {
|
||||
)
|
||||
|
||||
// Randomize first round of communication to avoid thundering herd.
|
||||
// If no potential peers are present directly start connecting so we guarantee
|
||||
// swift setup with the help of configured seeds.
|
||||
if r.hasPotentialPeers() {
|
||||
// If no peers are present directly start connecting so we guarantee swift
|
||||
// setup with the help of configured seeds.
|
||||
if r.nodeHasSomePeersOrDialingAny() {
|
||||
time.Sleep(time.Duration(jitter))
|
||||
}
|
||||
|
||||
@@ -493,23 +505,26 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) {
|
||||
|
||||
err := r.Switch.DialPeerWithAddress(addr, false)
|
||||
if err != nil {
|
||||
if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok {
|
||||
return
|
||||
}
|
||||
|
||||
r.Logger.Error("Dialing failed", "addr", addr, "err", err, "attempts", attempts)
|
||||
// TODO: detect more "bad peer" scenarios
|
||||
markAddrInBookBasedOnErr(addr, r.book, err)
|
||||
if _, ok := err.(p2p.ErrSwitchAuthenticationFailure); ok {
|
||||
r.book.MarkBad(addr)
|
||||
r.attemptsToDial.Delete(addr.DialString())
|
||||
} else {
|
||||
r.book.MarkAttempt(addr)
|
||||
// FIXME: if the addr is going to be removed from the addrbook (hard to
|
||||
// tell at this point), we need to Delete it from attemptsToDial, not
|
||||
// record another attempt.
|
||||
// record attempt
|
||||
r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()})
|
||||
}
|
||||
} else {
|
||||
// cleanup any history
|
||||
r.attemptsToDial.Delete(addr.DialString())
|
||||
return
|
||||
}
|
||||
|
||||
// cleanup any history
|
||||
r.attemptsToDial.Delete(addr.DialString())
|
||||
}
|
||||
|
||||
// checkSeeds checks that addresses are well formed.
|
||||
@@ -568,101 +583,92 @@ func (r *PEXReactor) AttemptsToDial(addr *p2p.NetAddress) int {
|
||||
// from peers, except other seed nodes.
|
||||
func (r *PEXReactor) crawlPeersRoutine() {
|
||||
// Do an initial crawl
|
||||
r.crawlPeers()
|
||||
r.crawlPeers(r.book.GetSelection())
|
||||
|
||||
// Fire periodically
|
||||
ticker := time.NewTicker(defaultCrawlPeersPeriod)
|
||||
ticker := time.NewTicker(crawlPeerPeriod)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
r.attemptDisconnects()
|
||||
r.crawlPeers()
|
||||
r.crawlPeers(r.book.GetSelection())
|
||||
r.cleanupCrawlPeerInfos()
|
||||
case <-r.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// hasPotentialPeers indicates if there is a potential peer to connect to, by
|
||||
// consulting the Switch as well as the AddrBook.
|
||||
func (r *PEXReactor) hasPotentialPeers() bool {
|
||||
// nodeHasSomePeersOrDialingAny returns true if the node is connected to some
|
||||
// peers or dialing them currently.
|
||||
func (r *PEXReactor) nodeHasSomePeersOrDialingAny() bool {
|
||||
out, in, dial := r.Switch.NumPeers()
|
||||
|
||||
return out+in+dial > 0 && len(r.book.ListOfKnownAddresses()) > 0
|
||||
return out+in+dial > 0
|
||||
}
|
||||
|
||||
// crawlPeerInfo handles temporary data needed for the
|
||||
// network crawling performed during seed/crawler mode.
|
||||
// crawlPeerInfo handles temporary data needed for the network crawling
|
||||
// performed during seed/crawler mode.
|
||||
type crawlPeerInfo struct {
|
||||
// The listening address of a potential peer we learned about
|
||||
Addr *p2p.NetAddress
|
||||
|
||||
// The last time we attempt to reach this address
|
||||
LastAttempt time.Time
|
||||
|
||||
// The last time we successfully reached this address
|
||||
LastSuccess time.Time
|
||||
Addr *p2p.NetAddress `json:"addr"`
|
||||
// The last time we crawled the peer or attempted to do so.
|
||||
LastCrawled time.Time `json:"last_crawled"`
|
||||
}
|
||||
|
||||
// oldestFirst implements sort.Interface for []crawlPeerInfo
|
||||
// based on the LastAttempt field.
|
||||
type oldestFirst []crawlPeerInfo
|
||||
|
||||
func (of oldestFirst) Len() int { return len(of) }
|
||||
func (of oldestFirst) Swap(i, j int) { of[i], of[j] = of[j], of[i] }
|
||||
func (of oldestFirst) Less(i, j int) bool { return of[i].LastAttempt.Before(of[j].LastAttempt) }
|
||||
|
||||
// getPeersToCrawl returns addresses of potential peers that we wish to validate.
|
||||
// NOTE: The status information is ordered as described above.
|
||||
func (r *PEXReactor) getPeersToCrawl() []crawlPeerInfo {
|
||||
// TODO: be more selective
|
||||
addrs := r.book.ListOfKnownAddresses()
|
||||
of := make(oldestFirst, 0, len(addrs))
|
||||
for _, addr := range addrs {
|
||||
if len(addr.ID()) == 0 {
|
||||
continue // dont use peers without id
|
||||
}
|
||||
|
||||
of = append(of, crawlPeerInfo{
|
||||
Addr: addr.Addr,
|
||||
LastAttempt: addr.LastAttempt,
|
||||
LastSuccess: addr.LastSuccess,
|
||||
})
|
||||
}
|
||||
sort.Sort(of)
|
||||
return of
|
||||
}
|
||||
|
||||
// crawlPeers will crawl the network looking for new peer addresses. (once)
|
||||
func (r *PEXReactor) crawlPeers() {
|
||||
peerInfos := r.getPeersToCrawl()
|
||||
|
||||
// crawlPeers will crawl the network looking for new peer addresses.
|
||||
func (r *PEXReactor) crawlPeers(addrs []*p2p.NetAddress) {
|
||||
now := time.Now()
|
||||
// Use addresses we know of to reach additional peers
|
||||
for _, pi := range peerInfos {
|
||||
// Do not attempt to connect with peers we recently dialed
|
||||
if now.Sub(pi.LastAttempt) < defaultCrawlPeerInterval {
|
||||
|
||||
for _, addr := range addrs {
|
||||
peerInfo, ok := r.crawlPeerInfos[addr.ID]
|
||||
|
||||
// Do not attempt to connect with peers we recently crawled.
|
||||
if ok && now.Sub(peerInfo.LastCrawled) < minTimeBetweenCrawls {
|
||||
continue
|
||||
}
|
||||
// Otherwise, attempt to connect with the known address
|
||||
err := r.Switch.DialPeerWithAddress(pi.Addr, false)
|
||||
|
||||
// Record crawling attempt.
|
||||
r.crawlPeerInfos[addr.ID] = crawlPeerInfo{
|
||||
Addr: addr,
|
||||
LastCrawled: now,
|
||||
}
|
||||
|
||||
err := r.Switch.DialPeerWithAddress(addr, false)
|
||||
if err != nil {
|
||||
r.book.MarkAttempt(pi.Addr)
|
||||
if _, ok := err.(p2p.ErrCurrentlyDialingOrExistingAddress); ok {
|
||||
continue
|
||||
}
|
||||
|
||||
r.Logger.Error("Dialing failed", "addr", addr, "err", err)
|
||||
markAddrInBookBasedOnErr(addr, r.book, err)
|
||||
continue
|
||||
}
|
||||
// Ask for more addresses
|
||||
peer := r.Switch.Peers().Get(pi.Addr.ID)
|
||||
|
||||
peer := r.Switch.Peers().Get(addr.ID)
|
||||
if peer != nil {
|
||||
r.RequestAddrs(peer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *PEXReactor) cleanupCrawlPeerInfos() {
|
||||
for id, info := range r.crawlPeerInfos {
|
||||
// If we did not crawl a peer for 24 hours, it means the peer was removed
|
||||
// from the addrbook => remove
|
||||
//
|
||||
// 10000 addresses / maxGetSelection = 40 cycles to get all addresses in
|
||||
// the ideal case,
|
||||
// 40 * crawlPeerPeriod ~ 20 minutes
|
||||
if time.Since(info.LastCrawled) > 24*time.Hour {
|
||||
delete(r.crawlPeerInfos, id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// attemptDisconnects checks if we've been with each peer long enough to disconnect
|
||||
func (r *PEXReactor) attemptDisconnects() {
|
||||
for _, peer := range r.Switch.Peers().List() {
|
||||
if peer.Status().Duration < defaultSeedDisconnectWaitPeriod {
|
||||
if peer.Status().Duration < r.config.SeedDisconnectWaitPeriod {
|
||||
continue
|
||||
}
|
||||
if peer.IsPersistent() {
|
||||
@@ -672,6 +678,16 @@ func (r *PEXReactor) attemptDisconnects() {
|
||||
}
|
||||
}
|
||||
|
||||
func markAddrInBookBasedOnErr(addr *p2p.NetAddress, book AddrBook, err error) {
|
||||
// TODO: detect more "bad peer" scenarios
|
||||
switch err.(type) {
|
||||
case p2p.ErrSwitchAuthenticationFailure:
|
||||
book.MarkBad(addr)
|
||||
default:
|
||||
book.MarkAttempt(addr)
|
||||
}
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Messages
|
||||
|
||||
|
@@ -96,7 +96,7 @@ func TestPEXReactorRunning(t *testing.T) {
|
||||
}
|
||||
|
||||
addOtherNodeAddrToAddrBook := func(switchIndex, otherSwitchIndex int) {
|
||||
addr := switches[otherSwitchIndex].NodeInfo().NetAddress()
|
||||
addr := switches[otherSwitchIndex].NetAddress()
|
||||
books[switchIndex].AddAddress(addr, addr)
|
||||
}
|
||||
|
||||
@@ -127,7 +127,7 @@ func TestPEXReactorReceive(t *testing.T) {
|
||||
r.RequestAddrs(peer)
|
||||
|
||||
size := book.Size()
|
||||
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
|
||||
addrs := []*p2p.NetAddress{peer.SocketAddr()}
|
||||
msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
|
||||
r.Receive(PexChannel, peer, msg)
|
||||
assert.Equal(t, size+1, book.Size())
|
||||
@@ -184,7 +184,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) {
|
||||
assert.True(t, r.requestsSent.Has(id))
|
||||
assert.True(t, sw.Peers().Has(peer.ID()))
|
||||
|
||||
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
|
||||
addrs := []*p2p.NetAddress{peer.SocketAddr()}
|
||||
msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
|
||||
|
||||
// receive some addrs. should clear the request
|
||||
@@ -204,36 +204,36 @@ func TestCheckSeeds(t *testing.T) {
|
||||
defer os.RemoveAll(dir) // nolint: errcheck
|
||||
|
||||
// 1. test creating peer with no seeds works
|
||||
peer := testCreateDefaultPeer(dir, 0)
|
||||
require.Nil(t, peer.Start())
|
||||
peer.Stop()
|
||||
peerSwitch := testCreateDefaultPeer(dir, 0)
|
||||
require.Nil(t, peerSwitch.Start())
|
||||
peerSwitch.Stop()
|
||||
|
||||
// 2. create seed
|
||||
seed := testCreateSeed(dir, 1, []*p2p.NetAddress{}, []*p2p.NetAddress{})
|
||||
|
||||
// 3. test create peer with online seed works
|
||||
peer = testCreatePeerWithSeed(dir, 2, seed)
|
||||
require.Nil(t, peer.Start())
|
||||
peer.Stop()
|
||||
peerSwitch = testCreatePeerWithSeed(dir, 2, seed)
|
||||
require.Nil(t, peerSwitch.Start())
|
||||
peerSwitch.Stop()
|
||||
|
||||
// 4. test create peer with all seeds having unresolvable DNS fails
|
||||
badPeerConfig := &PEXReactorConfig{
|
||||
Seeds: []string{"ed3dfd27bfc4af18f67a49862f04cc100696e84d@bad.network.addr:26657",
|
||||
"d824b13cb5d40fa1d8a614e089357c7eff31b670@anotherbad.network.addr:26657"},
|
||||
}
|
||||
peer = testCreatePeerWithConfig(dir, 2, badPeerConfig)
|
||||
require.Error(t, peer.Start())
|
||||
peer.Stop()
|
||||
peerSwitch = testCreatePeerWithConfig(dir, 2, badPeerConfig)
|
||||
require.Error(t, peerSwitch.Start())
|
||||
peerSwitch.Stop()
|
||||
|
||||
// 5. test create peer with one good seed address succeeds
|
||||
badPeerConfig = &PEXReactorConfig{
|
||||
Seeds: []string{"ed3dfd27bfc4af18f67a49862f04cc100696e84d@bad.network.addr:26657",
|
||||
"d824b13cb5d40fa1d8a614e089357c7eff31b670@anotherbad.network.addr:26657",
|
||||
seed.NodeInfo().NetAddress().String()},
|
||||
seed.NetAddress().String()},
|
||||
}
|
||||
peer = testCreatePeerWithConfig(dir, 2, badPeerConfig)
|
||||
require.Nil(t, peer.Start())
|
||||
peer.Stop()
|
||||
peerSwitch = testCreatePeerWithConfig(dir, 2, badPeerConfig)
|
||||
require.Nil(t, peerSwitch.Start())
|
||||
peerSwitch.Stop()
|
||||
}
|
||||
|
||||
func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) {
|
||||
@@ -263,12 +263,13 @@ func TestConnectionSpeedForPeerReceivedFromSeed(t *testing.T) {
|
||||
defer os.RemoveAll(dir) // nolint: errcheck
|
||||
|
||||
// 1. create peer
|
||||
peer := testCreateDefaultPeer(dir, 1)
|
||||
require.Nil(t, peer.Start())
|
||||
defer peer.Stop()
|
||||
peerSwitch := testCreateDefaultPeer(dir, 1)
|
||||
require.Nil(t, peerSwitch.Start())
|
||||
defer peerSwitch.Stop()
|
||||
|
||||
// 2. Create seed which knows about the peer
|
||||
seed := testCreateSeed(dir, 2, []*p2p.NetAddress{peer.NodeInfo().NetAddress()}, []*p2p.NetAddress{peer.NodeInfo().NetAddress()})
|
||||
peerAddr := peerSwitch.NetAddress()
|
||||
seed := testCreateSeed(dir, 2, []*p2p.NetAddress{peerAddr}, []*p2p.NetAddress{peerAddr})
|
||||
require.Nil(t, seed.Start())
|
||||
defer seed.Stop()
|
||||
|
||||
@@ -284,31 +285,41 @@ func TestConnectionSpeedForPeerReceivedFromSeed(t *testing.T) {
|
||||
assertPeersWithTimeout(t, []*p2p.Switch{secondPeer}, 10*time.Millisecond, 1*time.Second, 2)
|
||||
}
|
||||
|
||||
func TestPEXReactorCrawlStatus(t *testing.T) {
|
||||
pexR, book := createReactor(&PEXReactorConfig{SeedMode: true})
|
||||
func TestPEXReactorSeedMode(t *testing.T) {
|
||||
// directory to store address books
|
||||
dir, err := ioutil.TempDir("", "pex_reactor")
|
||||
require.Nil(t, err)
|
||||
defer os.RemoveAll(dir) // nolint: errcheck
|
||||
|
||||
pexR, book := createReactor(&PEXReactorConfig{SeedMode: true, SeedDisconnectWaitPeriod: 10 * time.Millisecond})
|
||||
defer teardownReactor(book)
|
||||
|
||||
// Seed/Crawler mode uses data from the Switch
|
||||
sw := createSwitchAndAddReactors(pexR)
|
||||
sw.SetAddrBook(book)
|
||||
err = sw.Start()
|
||||
require.NoError(t, err)
|
||||
defer sw.Stop()
|
||||
|
||||
// Create a peer, add it to the peer set and the addrbook.
|
||||
peer := p2p.CreateRandomPeer(false)
|
||||
p2p.AddPeerToSwitch(pexR.Switch, peer)
|
||||
addr1 := peer.NodeInfo().NetAddress()
|
||||
pexR.book.AddAddress(addr1, addr1)
|
||||
assert.Zero(t, sw.Peers().Size())
|
||||
|
||||
// Add a non-connected address to the book.
|
||||
_, addr2 := p2p.CreateRoutableAddr()
|
||||
pexR.book.AddAddress(addr2, addr1)
|
||||
peerSwitch := testCreateDefaultPeer(dir, 1)
|
||||
require.NoError(t, peerSwitch.Start())
|
||||
defer peerSwitch.Stop()
|
||||
|
||||
// Get some peerInfos to crawl
|
||||
peerInfos := pexR.getPeersToCrawl()
|
||||
// 1. Test crawlPeers dials the peer
|
||||
pexR.crawlPeers([]*p2p.NetAddress{peerSwitch.NetAddress()})
|
||||
assert.Equal(t, 1, sw.Peers().Size())
|
||||
assert.True(t, sw.Peers().Has(peerSwitch.NodeInfo().ID()))
|
||||
|
||||
// Make sure it has the proper number of elements
|
||||
assert.Equal(t, 2, len(peerInfos))
|
||||
// 2. attemptDisconnects should not disconnect because of wait period
|
||||
pexR.attemptDisconnects()
|
||||
assert.Equal(t, 1, sw.Peers().Size())
|
||||
|
||||
// TODO: test
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// 3. attemptDisconnects should disconnect after wait period
|
||||
pexR.attemptDisconnects()
|
||||
assert.Equal(t, 0, sw.Peers().Size())
|
||||
}
|
||||
|
||||
// connect a peer to a seed, wait a bit, then stop it.
|
||||
@@ -359,7 +370,7 @@ func TestPEXReactorSeedModeFlushStop(t *testing.T) {
|
||||
reactor := switches[0].Reactors()["pex"].(*PEXReactor)
|
||||
peerID := switches[1].NodeInfo().ID()
|
||||
|
||||
err = switches[1].DialPeerWithAddress(switches[0].NodeInfo().NetAddress(), false)
|
||||
err = switches[1].DialPeerWithAddress(switches[0].NetAddress(), false)
|
||||
assert.NoError(t, err)
|
||||
|
||||
// sleep up to a second while waiting for the peer to send us a message.
|
||||
@@ -397,7 +408,7 @@ func TestPEXReactorDoesNotAddPrivatePeersToAddrBook(t *testing.T) {
|
||||
pexR.RequestAddrs(peer)
|
||||
|
||||
size := book.Size()
|
||||
addrs := []*p2p.NetAddress{peer.NodeInfo().NetAddress()}
|
||||
addrs := []*p2p.NetAddress{peer.SocketAddr()}
|
||||
msg := cdc.MustMarshalBinaryBare(&pexAddrsMessage{Addrs: addrs})
|
||||
pexR.Receive(PexChannel, peer, msg)
|
||||
assert.Equal(t, size, book.Size())
|
||||
@@ -414,7 +425,7 @@ func TestPEXReactorDialPeer(t *testing.T) {
|
||||
sw.SetAddrBook(book)
|
||||
|
||||
peer := mock.NewPeer(nil)
|
||||
addr := peer.NodeInfo().NetAddress()
|
||||
addr := peer.SocketAddr()
|
||||
|
||||
assert.Equal(t, 0, pexR.AttemptsToDial(addr))
|
||||
|
||||
@@ -528,7 +539,7 @@ func testCreateSeed(dir string, id int, knownAddrs, srcAddrs []*p2p.NetAddress)
|
||||
book.SetLogger(log.TestingLogger())
|
||||
for j := 0; j < len(knownAddrs); j++ {
|
||||
book.AddAddress(knownAddrs[j], srcAddrs[j])
|
||||
book.MarkGood(knownAddrs[j])
|
||||
book.MarkGood(knownAddrs[j].ID)
|
||||
}
|
||||
sw.SetAddrBook(book)
|
||||
|
||||
@@ -547,7 +558,7 @@ func testCreateSeed(dir string, id int, knownAddrs, srcAddrs []*p2p.NetAddress)
|
||||
// Starting and stopping the peer is left to the caller
|
||||
func testCreatePeerWithSeed(dir string, id int, seed *p2p.Switch) *p2p.Switch {
|
||||
conf := &PEXReactorConfig{
|
||||
Seeds: []string{seed.NodeInfo().NetAddress().String()},
|
||||
Seeds: []string{seed.NetAddress().String()},
|
||||
}
|
||||
return testCreatePeerWithConfig(dir, id, conf)
|
||||
}
|
||||
|
@@ -6,6 +6,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/tendermint/tendermint/config"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
@@ -46,7 +48,7 @@ type AddrBook interface {
|
||||
AddAddress(addr *NetAddress, src *NetAddress) error
|
||||
AddOurAddress(*NetAddress)
|
||||
OurAddress(*NetAddress) bool
|
||||
MarkGood(*NetAddress)
|
||||
MarkGood(ID)
|
||||
RemoveAddress(*NetAddress)
|
||||
HasAddress(*NetAddress) bool
|
||||
Save()
|
||||
@@ -86,6 +88,12 @@ type Switch struct {
|
||||
metrics *Metrics
|
||||
}
|
||||
|
||||
// NetAddress returns the address the switch is listening on.
|
||||
func (sw *Switch) NetAddress() *NetAddress {
|
||||
addr := sw.transport.NetAddress()
|
||||
return &addr
|
||||
}
|
||||
|
||||
// SwitchOption sets an optional parameter on the Switch.
|
||||
type SwitchOption func(*Switch)
|
||||
|
||||
@@ -289,13 +297,7 @@ func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) {
|
||||
sw.stopAndRemovePeer(peer, reason)
|
||||
|
||||
if peer.IsPersistent() {
|
||||
addr := peer.OriginalAddr()
|
||||
if addr == nil {
|
||||
// FIXME: persistent peers can't be inbound right now.
|
||||
// self-reported address for inbound persistent peers
|
||||
addr = peer.NodeInfo().NetAddress()
|
||||
}
|
||||
go sw.reconnectToPeer(addr)
|
||||
go sw.reconnectToPeer(peer.SocketAddr())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -339,14 +341,11 @@ func (sw *Switch) reconnectToPeer(addr *NetAddress) {
|
||||
return
|
||||
}
|
||||
|
||||
if sw.IsDialingOrExistingAddress(addr) {
|
||||
sw.Logger.Debug("Peer connection has been established or dialed while we waiting next try", "addr", addr)
|
||||
return
|
||||
}
|
||||
|
||||
err := sw.DialPeerWithAddress(addr, true)
|
||||
if err == nil {
|
||||
return // success
|
||||
} else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok {
|
||||
return
|
||||
}
|
||||
|
||||
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
|
||||
@@ -365,9 +364,12 @@ func (sw *Switch) reconnectToPeer(addr *NetAddress) {
|
||||
// sleep an exponentially increasing amount
|
||||
sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
|
||||
sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
|
||||
|
||||
err := sw.DialPeerWithAddress(addr, true)
|
||||
if err == nil {
|
||||
return // success
|
||||
} else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok {
|
||||
return
|
||||
}
|
||||
sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "err", err, "addr", addr)
|
||||
}
|
||||
@@ -383,13 +385,22 @@ func (sw *Switch) SetAddrBook(addrBook AddrBook) {
|
||||
// like contributed to consensus.
|
||||
func (sw *Switch) MarkPeerAsGood(peer Peer) {
|
||||
if sw.addrBook != nil {
|
||||
sw.addrBook.MarkGood(peer.NodeInfo().NetAddress())
|
||||
sw.addrBook.MarkGood(peer.ID())
|
||||
}
|
||||
}
|
||||
|
||||
//---------------------------------------------------------------------
|
||||
// Dialing
|
||||
|
||||
type privateAddr interface {
|
||||
PrivateAddr() bool
|
||||
}
|
||||
|
||||
func isPrivateAddr(err error) bool {
|
||||
te, ok := errors.Cause(err).(privateAddr)
|
||||
return ok && te.PrivateAddr()
|
||||
}
|
||||
|
||||
// DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent).
|
||||
// Used to dial peers from config on startup or from unsafe-RPC (trusted sources).
|
||||
// TODO: remove addrBook arg since it's now set on the switch
|
||||
@@ -400,7 +411,7 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b
|
||||
sw.Logger.Error("Error in peer's address", "err", err)
|
||||
}
|
||||
|
||||
ourAddr := sw.nodeInfo.NetAddress()
|
||||
ourAddr := sw.NetAddress()
|
||||
|
||||
// TODO: this code feels like it's in the wrong place.
|
||||
// The integration tests depend on the addrBook being saved
|
||||
@@ -412,7 +423,11 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b
|
||||
// do not add our address or ID
|
||||
if !netAddr.Same(ourAddr) {
|
||||
if err := addrBook.AddAddress(netAddr, ourAddr); err != nil {
|
||||
sw.Logger.Error("Can't add peer's address to addrbook", "err", err)
|
||||
if isPrivateAddr(err) {
|
||||
sw.Logger.Debug("Won't add peer's address to addrbook", "err", err)
|
||||
} else {
|
||||
sw.Logger.Error("Can't add peer's address to addrbook", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -435,15 +450,10 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b
|
||||
|
||||
sw.randomSleep(0)
|
||||
|
||||
if sw.IsDialingOrExistingAddress(addr) {
|
||||
sw.Logger.Debug("Ignore attempt to connect to an existing peer", "addr", addr)
|
||||
return
|
||||
}
|
||||
|
||||
err := sw.DialPeerWithAddress(addr, persistent)
|
||||
if err != nil {
|
||||
switch err.(type) {
|
||||
case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID:
|
||||
case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID, ErrCurrentlyDialingOrExistingAddress:
|
||||
sw.Logger.Debug("Error dialing peer", "err", err)
|
||||
default:
|
||||
sw.Logger.Error("Error dialing peer", "err", err)
|
||||
@@ -454,11 +464,20 @@ func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent b
|
||||
return nil
|
||||
}
|
||||
|
||||
// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully.
|
||||
// If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails.
|
||||
// DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects
|
||||
// and authenticates successfully.
|
||||
// If `persistent == true`, the switch will always try to reconnect to this
|
||||
// peer if the connection ever fails.
|
||||
// If we're currently dialing this address or it belongs to an existing peer,
|
||||
// ErrCurrentlyDialingOrExistingAddress is returned.
|
||||
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) error {
|
||||
if sw.IsDialingOrExistingAddress(addr) {
|
||||
return ErrCurrentlyDialingOrExistingAddress{addr.String()}
|
||||
}
|
||||
|
||||
sw.dialing.Set(string(addr.ID), addr)
|
||||
defer sw.dialing.Delete(string(addr.ID))
|
||||
|
||||
return sw.addOutboundPeerWithConfig(addr, sw.config, persistent)
|
||||
}
|
||||
|
||||
@@ -536,7 +555,7 @@ func (sw *Switch) acceptRoutine() {
|
||||
if in >= sw.config.MaxNumInboundPeers {
|
||||
sw.Logger.Info(
|
||||
"Ignoring inbound connection: already have enough inbound peers",
|
||||
"address", p.NodeInfo().NetAddress().String(),
|
||||
"address", p.SocketAddr(),
|
||||
"have", in,
|
||||
"max", sw.config.MaxNumInboundPeers,
|
||||
)
|
||||
@@ -653,7 +672,7 @@ func (sw *Switch) addPeer(p Peer) error {
|
||||
return err
|
||||
}
|
||||
|
||||
p.SetLogger(sw.Logger.With("peer", p.NodeInfo().NetAddress()))
|
||||
p.SetLogger(sw.Logger.With("peer", p.SocketAddr()))
|
||||
|
||||
// Handle the shut down case where the switch has stopped but we're
|
||||
// concurrently trying to add a peer.
|
||||
|
@@ -161,10 +161,6 @@ func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, r
|
||||
|
||||
func TestSwitchFiltersOutItself(t *testing.T) {
|
||||
s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc)
|
||||
// addr := s1.NodeInfo().NetAddress()
|
||||
|
||||
// // add ourselves like we do in node.go#427
|
||||
// s1.addrBook.AddOurAddress(addr)
|
||||
|
||||
// simulate s1 having a public IP by creating a remote peer with the same ID
|
||||
rp := &remotePeer{PrivKey: s1.nodeKey.PrivKey, Config: cfg}
|
||||
@@ -498,7 +494,7 @@ func TestSwitchAcceptRoutine(t *testing.T) {
|
||||
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
||||
remotePeers = append(remotePeers, rp)
|
||||
rp.Start()
|
||||
c, err := rp.Dial(sw.NodeInfo().NetAddress())
|
||||
c, err := rp.Dial(sw.NetAddress())
|
||||
require.NoError(t, err)
|
||||
// spawn a reading routine to prevent connection from closing
|
||||
go func(c net.Conn) {
|
||||
@@ -517,7 +513,7 @@ func TestSwitchAcceptRoutine(t *testing.T) {
|
||||
// 2. check we close new connections if we already have MaxNumInboundPeers peers
|
||||
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
|
||||
rp.Start()
|
||||
conn, err := rp.Dial(sw.NodeInfo().NetAddress())
|
||||
conn, err := rp.Dial(sw.NetAddress())
|
||||
require.NoError(t, err)
|
||||
// check conn is closed
|
||||
one := make([]byte, 1)
|
||||
@@ -537,6 +533,10 @@ type errorTransport struct {
|
||||
acceptErr error
|
||||
}
|
||||
|
||||
func (et errorTransport) NetAddress() NetAddress {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (et errorTransport) Accept(c peerConfig) (Peer, error) {
|
||||
return nil, et.acceptErr
|
||||
}
|
||||
@@ -626,7 +626,7 @@ func (book *addrBookMock) OurAddress(addr *NetAddress) bool {
|
||||
_, ok := book.ourAddrs[addr.String()]
|
||||
return ok
|
||||
}
|
||||
func (book *addrBookMock) MarkGood(*NetAddress) {}
|
||||
func (book *addrBookMock) MarkGood(ID) {}
|
||||
func (book *addrBookMock) HasAddress(addr *NetAddress) bool {
|
||||
_, ok := book.addrs[addr.String()]
|
||||
return ok
|
||||
|
@@ -23,7 +23,7 @@ type mockNodeInfo struct {
|
||||
}
|
||||
|
||||
func (ni mockNodeInfo) ID() ID { return ni.addr.ID }
|
||||
func (ni mockNodeInfo) NetAddress() *NetAddress { return ni.addr }
|
||||
func (ni mockNodeInfo) NetAddress() (*NetAddress, error) { return ni.addr, nil }
|
||||
func (ni mockNodeInfo) Validate() error { return nil }
|
||||
func (ni mockNodeInfo) CompatibleWith(other NodeInfo) error { return nil }
|
||||
|
||||
@@ -35,7 +35,8 @@ func CreateRandomPeer(outbound bool) *peer {
|
||||
addr, netAddr := CreateRoutableAddr()
|
||||
p := &peer{
|
||||
peerConn: peerConn{
|
||||
outbound: outbound,
|
||||
outbound: outbound,
|
||||
socketAddr: netAddr,
|
||||
},
|
||||
nodeInfo: mockNodeInfo{netAddr},
|
||||
mconn: &conn.MConnection{},
|
||||
@@ -174,10 +175,15 @@ func MakeSwitch(
|
||||
PrivKey: ed25519.GenPrivKey(),
|
||||
}
|
||||
nodeInfo := testNodeInfo(nodeKey.ID(), fmt.Sprintf("node%d", i))
|
||||
addr, err := NewNetAddressString(
|
||||
IDAddressString(nodeKey.ID(), nodeInfo.(DefaultNodeInfo).ListenAddr),
|
||||
)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
t := NewMultiplexTransport(nodeInfo, nodeKey, MConnConfig(cfg))
|
||||
|
||||
addr := nodeInfo.NetAddress()
|
||||
if err := t.Listen(*addr); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -214,7 +220,7 @@ func testPeerConn(
|
||||
cfg *config.P2PConfig,
|
||||
outbound, persistent bool,
|
||||
ourNodePrivKey crypto.PrivKey,
|
||||
originalAddr *NetAddress,
|
||||
socketAddr *NetAddress,
|
||||
) (pc peerConn, err error) {
|
||||
conn := rawConn
|
||||
|
||||
@@ -231,12 +237,7 @@ func testPeerConn(
|
||||
}
|
||||
|
||||
// Only the information we already have
|
||||
return peerConn{
|
||||
outbound: outbound,
|
||||
persistent: persistent,
|
||||
conn: conn,
|
||||
originalAddr: originalAddr,
|
||||
}, nil
|
||||
return newPeerConn(outbound, persistent, conn, socketAddr), nil
|
||||
}
|
||||
|
||||
//----------------------------------------------------------------
|
||||
|
@@ -24,6 +24,7 @@ type IPResolver interface {
|
||||
// accept is the container to carry the upgraded connection and NodeInfo from an
|
||||
// asynchronously running routine to the Accept method.
|
||||
type accept struct {
|
||||
netAddr *NetAddress
|
||||
conn net.Conn
|
||||
nodeInfo NodeInfo
|
||||
err error
|
||||
@@ -47,6 +48,9 @@ type peerConfig struct {
|
||||
// the transport. Each transport is also responsible to filter establishing
|
||||
// peers specific to its domain.
|
||||
type Transport interface {
|
||||
// Listening address.
|
||||
NetAddress() NetAddress
|
||||
|
||||
// Accept returns a newly connected Peer.
|
||||
Accept(peerConfig) (Peer, error)
|
||||
|
||||
@@ -115,6 +119,7 @@ func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption {
|
||||
// MultiplexTransport accepts and dials tcp connections and upgrades them to
|
||||
// multiplexed peers.
|
||||
type MultiplexTransport struct {
|
||||
netAddr NetAddress
|
||||
listener net.Listener
|
||||
|
||||
acceptc chan accept
|
||||
@@ -161,6 +166,11 @@ func NewMultiplexTransport(
|
||||
}
|
||||
}
|
||||
|
||||
// NetAddress implements Transport.
|
||||
func (mt *MultiplexTransport) NetAddress() NetAddress {
|
||||
return mt.netAddr
|
||||
}
|
||||
|
||||
// Accept implements Transport.
|
||||
func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) {
|
||||
select {
|
||||
@@ -173,7 +183,7 @@ func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) {
|
||||
|
||||
cfg.outbound = false
|
||||
|
||||
return mt.wrapPeer(a.conn, a.nodeInfo, cfg, nil), nil
|
||||
return mt.wrapPeer(a.conn, a.nodeInfo, cfg, a.netAddr), nil
|
||||
case <-mt.closec:
|
||||
return nil, ErrTransportClosed{}
|
||||
}
|
||||
@@ -224,6 +234,7 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
|
||||
return err
|
||||
}
|
||||
|
||||
mt.netAddr = addr
|
||||
mt.listener = ln
|
||||
|
||||
go mt.acceptPeers()
|
||||
@@ -258,15 +269,21 @@ func (mt *MultiplexTransport) acceptPeers() {
|
||||
var (
|
||||
nodeInfo NodeInfo
|
||||
secretConn *conn.SecretConnection
|
||||
netAddr *NetAddress
|
||||
)
|
||||
|
||||
err := mt.filterConn(c)
|
||||
if err == nil {
|
||||
secretConn, nodeInfo, err = mt.upgrade(c, nil)
|
||||
if err == nil {
|
||||
addr := c.RemoteAddr()
|
||||
id := PubKeyToID(secretConn.RemotePubKey())
|
||||
netAddr = NewNetAddress(id, addr)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case mt.acceptc <- accept{secretConn, nodeInfo, err}:
|
||||
case mt.acceptc <- accept{netAddr, secretConn, nodeInfo, err}:
|
||||
// Make the upgraded peer available.
|
||||
case <-mt.closec:
|
||||
// Give up if the transport was closed.
|
||||
@@ -347,7 +364,7 @@ func (mt *MultiplexTransport) upgrade(
|
||||
if err != nil {
|
||||
return nil, nil, ErrRejected{
|
||||
conn: c,
|
||||
err: fmt.Errorf("secrect conn failed: %v", err),
|
||||
err: fmt.Errorf("secret conn failed: %v", err),
|
||||
isAuthFailure: true,
|
||||
}
|
||||
}
|
||||
@@ -360,7 +377,7 @@ func (mt *MultiplexTransport) upgrade(
|
||||
conn: c,
|
||||
id: connID,
|
||||
err: fmt.Errorf(
|
||||
"conn.ID (%v) dialed ID (%v) missmatch",
|
||||
"conn.ID (%v) dialed ID (%v) mismatch",
|
||||
connID,
|
||||
dialedID,
|
||||
),
|
||||
@@ -392,7 +409,7 @@ func (mt *MultiplexTransport) upgrade(
|
||||
conn: c,
|
||||
id: connID,
|
||||
err: fmt.Errorf(
|
||||
"conn.ID (%v) NodeInfo.ID (%v) missmatch",
|
||||
"conn.ID (%v) NodeInfo.ID (%v) mismatch",
|
||||
connID,
|
||||
nodeInfo.ID(),
|
||||
),
|
||||
@@ -426,14 +443,14 @@ func (mt *MultiplexTransport) wrapPeer(
|
||||
c net.Conn,
|
||||
ni NodeInfo,
|
||||
cfg peerConfig,
|
||||
dialedAddr *NetAddress,
|
||||
socketAddr *NetAddress,
|
||||
) Peer {
|
||||
|
||||
peerConn := newPeerConn(
|
||||
cfg.outbound,
|
||||
cfg.persistent,
|
||||
c,
|
||||
dialedAddr,
|
||||
socketAddr,
|
||||
)
|
||||
|
||||
p := newPeer(
|
||||
|
@@ -8,6 +8,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
"github.com/tendermint/tendermint/p2p/conn"
|
||||
)
|
||||
@@ -142,43 +144,23 @@ func TestTransportMultiplexConnFilterTimeout(t *testing.T) {
|
||||
|
||||
func TestTransportMultiplexAcceptMultiple(t *testing.T) {
|
||||
mt := testSetupMultiplexTransport(t)
|
||||
id, addr := mt.nodeKey.ID(), mt.listener.Addr().String()
|
||||
laddr, err := NewNetAddressStringWithOptionalID(IDAddressString(id, addr))
|
||||
require.NoError(t, err)
|
||||
|
||||
var (
|
||||
seed = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
errc = make(chan error, seed.Intn(64)+64)
|
||||
seed = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
nDialers = seed.Intn(64) + 64
|
||||
errc = make(chan error, nDialers)
|
||||
)
|
||||
|
||||
// Setup dialers.
|
||||
for i := 0; i < cap(errc); i++ {
|
||||
go func() {
|
||||
var (
|
||||
pv = ed25519.GenPrivKey()
|
||||
dialer = newMultiplexTransport(
|
||||
testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName),
|
||||
NodeKey{
|
||||
PrivKey: pv,
|
||||
},
|
||||
)
|
||||
)
|
||||
addr, err := NewNetAddressStringWithOptionalID(IDAddressString(mt.nodeKey.ID(), mt.listener.Addr().String()))
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
|
||||
_, err = dialer.Dial(*addr, peerConfig{})
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
|
||||
// Signal that the connection was established.
|
||||
errc <- nil
|
||||
}()
|
||||
for i := 0; i < nDialers; i++ {
|
||||
go testDialer(*laddr, errc)
|
||||
}
|
||||
|
||||
// Catch connection errors.
|
||||
for i := 0; i < cap(errc); i++ {
|
||||
for i := 0; i < nDialers; i++ {
|
||||
if err := <-errc; err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -216,6 +198,27 @@ func TestTransportMultiplexAcceptMultiple(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testDialer(dialAddr NetAddress, errc chan error) {
|
||||
var (
|
||||
pv = ed25519.GenPrivKey()
|
||||
dialer = newMultiplexTransport(
|
||||
testNodeInfo(PubKeyToID(pv.PubKey()), defaultNodeName),
|
||||
NodeKey{
|
||||
PrivKey: pv,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
_, err := dialer.Dial(dialAddr, peerConfig{})
|
||||
if err != nil {
|
||||
errc <- err
|
||||
return
|
||||
}
|
||||
|
||||
// Signal that the connection was established.
|
||||
errc <- nil
|
||||
}
|
||||
|
||||
func TestTransportMultiplexAcceptNonBlocking(t *testing.T) {
|
||||
mt := testSetupMultiplexTransport(t)
|
||||
|
||||
@@ -591,6 +594,7 @@ func TestTransportHandshake(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// create listener
|
||||
func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport {
|
||||
var (
|
||||
pv = ed25519.GenPrivKey()
|
||||
|
@@ -218,7 +218,7 @@ func DumpConsensusState(ctx *rpctypes.Context) (*ctypes.ResultDumpConsensusState
|
||||
}
|
||||
peerStates[i] = ctypes.PeerStateInfo{
|
||||
// Peer basic info.
|
||||
NodeAddress: peer.NodeInfo().NetAddress().String(),
|
||||
NodeAddress: peer.SocketAddr().String(),
|
||||
// Peer consensus state.
|
||||
PeerState: peerStateJSON,
|
||||
}
|
||||
|
@@ -29,12 +29,12 @@ def request(org, repo, data):
|
||||
return json.loads(responsedata)
|
||||
|
||||
|
||||
def create_draft(org,repo,version):
|
||||
def create_draft(org,repo,branch,version):
|
||||
draft = {
|
||||
'tag_name': version,
|
||||
'target_commitish': 'master',
|
||||
'target_commitish': '{0}'.format(branch),
|
||||
'name': '{0} (WARNING: ALPHA SOFTWARE)'.format(version),
|
||||
'body': '<a href=https://github.com/{0}/{1}/blob/master/CHANGELOG.md#{2}>https://github.com/{0}/{1}/blob/master/CHANGELOG.md#{2}</a>'.format(org,repo,version.replace('v','').replace('.','')),
|
||||
'body': '<a href=https://github.com/{0}/{1}/blob/{2}/CHANGELOG.md#{3}>https://github.com/{0}/{1}/blob/{2}/CHANGELOG.md#{3}</a>'.format(org,repo,branch,version.replace('.','')),
|
||||
'draft': True,
|
||||
'prerelease': False
|
||||
}
|
||||
@@ -45,6 +45,7 @@ if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--org", default="tendermint", help="GitHub organization")
|
||||
parser.add_argument("--repo", default="tendermint", help="GitHub repository")
|
||||
parser.add_argument("--branch", default=os.environ.get('CIRCLE_BRANCH'), help="Branch to build from, e.g.: v1.0")
|
||||
parser.add_argument("--version", default=os.environ.get('CIRCLE_TAG'), help="Version number for binary, e.g.: v1.0.0")
|
||||
args = parser.parse_args()
|
||||
|
||||
@@ -54,7 +55,7 @@ if __name__ == "__main__":
|
||||
if not os.environ.has_key('GITHUB_TOKEN'):
|
||||
raise parser.error('environment variable GITHUB_TOKEN is required')
|
||||
|
||||
release = create_draft(args.org,args.repo,args.version)
|
||||
release = create_draft(args.org,args.repo,args.branch,args.version)
|
||||
|
||||
print(release["id"])
|
||||
|
||||
|
@@ -9,6 +9,14 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
const (
|
||||
// persist validators every valSetCheckpointInterval blocks to avoid
|
||||
// LoadValidators taking too much time.
|
||||
// https://github.com/tendermint/tendermint/pull/3438
|
||||
// 100000 results in ~ 100ms to get 100 validators (see BenchmarkLoadValidators)
|
||||
valSetCheckpointInterval = 100000
|
||||
)
|
||||
|
||||
//------------------------------------------------------------------------
|
||||
|
||||
func calcValidatorsKey(height int64) []byte {
|
||||
@@ -182,25 +190,38 @@ func LoadValidators(db dbm.DB, height int64) (*types.ValidatorSet, error) {
|
||||
if valInfo == nil {
|
||||
return nil, ErrNoValSetForHeight{height}
|
||||
}
|
||||
|
||||
if valInfo.ValidatorSet == nil {
|
||||
valInfo2 := loadValidatorsInfo(db, valInfo.LastHeightChanged)
|
||||
lastStoredHeight := lastStoredHeightFor(height, valInfo.LastHeightChanged)
|
||||
valInfo2 := loadValidatorsInfo(db, lastStoredHeight)
|
||||
if valInfo2 == nil {
|
||||
panic(
|
||||
fmt.Sprintf(
|
||||
"Couldn't find validators at height %d as last changed from height %d",
|
||||
valInfo.LastHeightChanged,
|
||||
height,
|
||||
),
|
||||
)
|
||||
// TODO (melekes): remove the below if condition in the 0.33 major
|
||||
// release and just panic. Old chains might panic otherwise if they
|
||||
// haven't saved validators at intermediate (%valSetCheckpointInterval)
|
||||
// height yet.
|
||||
// https://github.com/tendermint/tendermint/issues/3543
|
||||
valInfo2 = loadValidatorsInfo(db, valInfo.LastHeightChanged)
|
||||
lastStoredHeight = valInfo.LastHeightChanged
|
||||
if valInfo2 == nil {
|
||||
panic(
|
||||
fmt.Sprintf("Couldn't find validators at height %d (height %d was originally requested)",
|
||||
lastStoredHeight,
|
||||
height,
|
||||
),
|
||||
)
|
||||
}
|
||||
}
|
||||
valInfo2.ValidatorSet.IncrementProposerPriority(int(height - valInfo.LastHeightChanged)) // mutate
|
||||
valInfo2.ValidatorSet.IncrementProposerPriority(int(height - lastStoredHeight)) // mutate
|
||||
valInfo = valInfo2
|
||||
}
|
||||
|
||||
return valInfo.ValidatorSet, nil
|
||||
}
|
||||
|
||||
func lastStoredHeightFor(height, lastHeightChanged int64) int64 {
|
||||
checkpointHeight := height - height%valSetCheckpointInterval
|
||||
return cmn.MaxInt64(checkpointHeight, lastHeightChanged)
|
||||
}
|
||||
|
||||
// CONTRACT: Returned ValidatorsInfo can be mutated.
|
||||
func loadValidatorsInfo(db dbm.DB, height int64) *ValidatorsInfo {
|
||||
buf := db.Get(calcValidatorsKey(height))
|
||||
@@ -221,10 +242,10 @@ func loadValidatorsInfo(db dbm.DB, height int64) *ValidatorsInfo {
|
||||
}
|
||||
|
||||
// saveValidatorsInfo persists the validator set.
|
||||
// `height` is the effective height for which the validator is responsible for signing.
|
||||
// It should be called from s.Save(), right before the state itself is persisted.
|
||||
// If the validator set did not change after processing the latest block,
|
||||
// only the last height for which the validators changed is persisted.
|
||||
//
|
||||
// `height` is the effective height for which the validator is responsible for
|
||||
// signing. It should be called from s.Save(), right before the state itself is
|
||||
// persisted.
|
||||
func saveValidatorsInfo(db dbm.DB, height, lastHeightChanged int64, valSet *types.ValidatorSet) {
|
||||
if lastHeightChanged > height {
|
||||
panic("LastHeightChanged cannot be greater than ValidatorsInfo height")
|
||||
@@ -232,7 +253,9 @@ func saveValidatorsInfo(db dbm.DB, height, lastHeightChanged int64, valSet *type
|
||||
valInfo := &ValidatorsInfo{
|
||||
LastHeightChanged: lastHeightChanged,
|
||||
}
|
||||
if lastHeightChanged == height {
|
||||
// Only persist validator set if it was updated or checkpoint height (see
|
||||
// valSetCheckpointInterval) is reached.
|
||||
if height == lastHeightChanged || height%valSetCheckpointInterval == 0 {
|
||||
valInfo.ValidatorSet = valSet
|
||||
}
|
||||
db.Set(calcValidatorsKey(height), valInfo.Bytes())
|
||||
|
67
state/store_test.go
Normal file
67
state/store_test.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
dbm "github.com/tendermint/tendermint/libs/db"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
func TestSaveValidatorsInfo(t *testing.T) {
|
||||
// test we persist validators every valSetCheckpointInterval blocks
|
||||
stateDB := dbm.NewMemDB()
|
||||
val, _ := types.RandValidator(true, 10)
|
||||
vals := types.NewValidatorSet([]*types.Validator{val})
|
||||
|
||||
// TODO(melekes): remove in 0.33 release
|
||||
// https://github.com/tendermint/tendermint/issues/3543
|
||||
saveValidatorsInfo(stateDB, 1, 1, vals)
|
||||
saveValidatorsInfo(stateDB, 2, 1, vals)
|
||||
assert.NotPanics(t, func() {
|
||||
_, err := LoadValidators(stateDB, 2)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
})
|
||||
//ENDREMOVE
|
||||
|
||||
saveValidatorsInfo(stateDB, valSetCheckpointInterval, 1, vals)
|
||||
|
||||
loadedVals, err := LoadValidators(stateDB, valSetCheckpointInterval)
|
||||
assert.NoError(t, err)
|
||||
assert.NotZero(t, loadedVals.Size())
|
||||
}
|
||||
|
||||
func BenchmarkLoadValidators(b *testing.B) {
|
||||
const valSetSize = 100
|
||||
|
||||
config := cfg.ResetTestRoot("state_")
|
||||
defer os.RemoveAll(config.RootDir)
|
||||
dbType := dbm.DBBackendType(config.DBBackend)
|
||||
stateDB := dbm.NewDB("state", dbType, config.DBDir())
|
||||
state, err := LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile())
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
state.Validators = genValSet(valSetSize)
|
||||
state.NextValidators = state.Validators.CopyIncrementProposerPriority(1)
|
||||
SaveState(stateDB, state)
|
||||
|
||||
for i := 10; i < 10000000000; i *= 10 { // 10, 100, 1000, ...
|
||||
saveValidatorsInfo(stateDB, int64(i), state.LastHeightValidatorsChanged, state.NextValidators)
|
||||
|
||||
b.Run(fmt.Sprintf("height=%d", i), func(b *testing.B) {
|
||||
for n := 0; n < b.N; n++ {
|
||||
_, err := LoadValidators(stateDB, int64(i))
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
@@ -20,7 +20,7 @@ const (
|
||||
// Must be a string because scripts like dist.sh read this file.
|
||||
// XXX: Don't change the name of this variable or you will break
|
||||
// automation :)
|
||||
TMCoreSemVer = "0.31.1"
|
||||
TMCoreSemVer = "0.31.4"
|
||||
|
||||
// ABCISemVer is the semantic version of the ABCI library
|
||||
ABCISemVer = "0.16.0"
|
||||
|
Reference in New Issue
Block a user