diff --git a/.circleci/config.yml b/.circleci/config.yml index 5669384c..ecc7c0ac 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -240,7 +240,7 @@ jobs: for pkg in $(go list github.com/tendermint/tendermint/... | circleci tests split --split-by=timings); do id=$(basename "$pkg") - GOCACHE=off go test -timeout 5m -race -coverprofile=/tmp/workspace/profiles/$id.out -covermode=atomic "$pkg" | tee "/tmp/logs/$id-$RANDOM.log" + GOCACHE=off go test -v -timeout 5m -race -coverprofile=/tmp/workspace/profiles/$id.out -covermode=atomic "$pkg" | tee "/tmp/logs/$id-$RANDOM.log" done - persist_to_workspace: root: /tmp/workspace diff --git a/CHANGELOG.md b/CHANGELOG.md index d79fb867..437b7970 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,28 @@ # Changelog +## v0.29.1 + +*January 24, 2019* + +Special thanks to external contributors on this release: +@infinytum, @gauthamzz + +This release contains two important fixes: one for p2p layer where we sometimes +were not closing connections and one for consensus layer where consensus with +no empty blocks (`create_empty_blocks = false`) could halt. + +Friendly reminder, we have a [bug bounty +program](https://hackerone.com/tendermint). + +### IMPROVEMENTS: +- [pex] [\#3037](https://github.com/tendermint/tendermint/issues/3037) Only log "Reached max attempts to dial" once +- [rpc] [\#3159](https://github.com/tendermint/tendermint/issues/3159) Expose + `triggered_timeout_commit` in the `/dump_consensus_state` + +### BUG FIXES: +- [consensus] [\#3199](https://github.com/tendermint/tendermint/issues/3199) Fix consensus halt with no empty blocks from not resetting triggeredTimeoutCommit +- [p2p] [\#2967](https://github.com/tendermint/tendermint/issues/2967) Fix file descriptor leak + ## v0.29.0 *January 21, 2019* diff --git a/consensus/state.go b/consensus/state.go index 26b07417..c69dfb87 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -94,7 +94,6 @@ type ConsensusState struct { // internal state mtx sync.RWMutex cstypes.RoundState - triggeredTimeoutPrecommit bool state sm.State // State until height-1. // state changes may be triggered by: msgs from peers, @@ -732,6 +731,7 @@ func (cs *ConsensusState) handleTxsAvailable() { cs.mtx.Lock() defer cs.mtx.Unlock() // we only need to do this for round 0 + cs.enterNewRound(cs.Height, 0) cs.enterPropose(cs.Height, 0) } @@ -782,7 +782,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) { cs.ProposalBlockParts = nil } cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping - cs.triggeredTimeoutPrecommit = false + cs.TriggeredTimeoutPrecommit = false cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()) cs.metrics.Rounds.Set(float64(round)) @@ -1128,12 +1128,12 @@ func (cs *ConsensusState) enterPrecommit(height int64, round int) { func (cs *ConsensusState) enterPrecommitWait(height int64, round int) { logger := cs.Logger.With("height", height, "round", round) - if cs.Height != height || round < cs.Round || (cs.Round == round && cs.triggeredTimeoutPrecommit) { + if cs.Height != height || round < cs.Round || (cs.Round == round && cs.TriggeredTimeoutPrecommit) { logger.Debug( fmt.Sprintf( "enterPrecommitWait(%v/%v): Invalid args. "+ - "Current state is Height/Round: %v/%v/, triggeredTimeoutPrecommit:%v", - height, round, cs.Height, cs.Round, cs.triggeredTimeoutPrecommit)) + "Current state is Height/Round: %v/%v/, TriggeredTimeoutPrecommit:%v", + height, round, cs.Height, cs.Round, cs.TriggeredTimeoutPrecommit)) return } if !cs.Votes.Precommits(round).HasTwoThirdsAny() { @@ -1143,7 +1143,7 @@ func (cs *ConsensusState) enterPrecommitWait(height int64, round int) { defer func() { // Done enterPrecommitWait: - cs.triggeredTimeoutPrecommit = true + cs.TriggeredTimeoutPrecommit = true cs.newStep() }() diff --git a/consensus/state_test.go b/consensus/state_test.go index 40103e47..10c04fbc 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -1279,6 +1279,71 @@ func TestCommitFromPreviousRound(t *testing.T) { ensureNewRound(newRoundCh, height+1, 0) } +type fakeTxNotifier struct { + ch chan struct{} +} + +func (n *fakeTxNotifier) TxsAvailable() <-chan struct{} { + return n.ch +} + +func (n *fakeTxNotifier) Notify() { + n.ch <- struct{}{} +} + +func TestStartNextHeightCorrectly(t *testing.T) { + cs1, vss := randConsensusState(4) + cs1.config.SkipTimeoutCommit = false + cs1.txNotifier = &fakeTxNotifier{ch: make(chan struct{})} + + vs2, vs3, vs4 := vss[1], vss[2], vss[3] + height, round := cs1.Height, cs1.Round + + proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) + timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) + + newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) + newBlockHeader := subscribe(cs1.eventBus, types.EventQueryNewBlockHeader) + addr := cs1.privValidator.GetPubKey().Address() + voteCh := subscribeToVoter(cs1, addr) + + // start round and wait for propose and prevote + startTestRound(cs1, height, round) + ensureNewRound(newRoundCh, height, round) + + ensureNewProposal(proposalCh, height, round) + rs := cs1.GetRoundState() + theBlockHash := rs.ProposalBlock.Hash() + theBlockParts := rs.ProposalBlockParts.Header() + + ensurePrevote(voteCh, height, round) + validatePrevote(t, cs1, round, vss[0], theBlockHash) + + signAddVotes(cs1, types.PrevoteType, theBlockHash, theBlockParts, vs2, vs3, vs4) + + ensurePrecommit(voteCh, height, round) + // the proposed block should now be locked and our precommit added + validatePrecommit(t, cs1, round, round, vss[0], theBlockHash, theBlockHash) + + rs = cs1.GetRoundState() + + // add precommits + signAddVotes(cs1, types.PrecommitType, nil, types.PartSetHeader{}, vs2) + signAddVotes(cs1, types.PrecommitType, theBlockHash, theBlockParts, vs3) + signAddVotes(cs1, types.PrecommitType, theBlockHash, theBlockParts, vs4) + + ensureNewBlockHeader(newBlockHeader, height, theBlockHash) + + rs = cs1.GetRoundState() + assert.True(t, rs.TriggeredTimeoutPrecommit) + + cs1.txNotifier.(*fakeTxNotifier).Notify() + + ensureNewTimeout(timeoutProposeCh, height+1, round, cs1.config.TimeoutPropose.Nanoseconds()) + rs = cs1.GetRoundState() + assert.False(t, rs.TriggeredTimeoutPrecommit, "triggeredTimeoutPrecommit should be false at the beginning of each round") +} + //------------------------------------------------------------------------------------------ // SlashingSuite // TODO: Slashing diff --git a/consensus/types/round_state.go b/consensus/types/round_state.go index 418f73a8..eab13b6c 100644 --- a/consensus/types/round_state.go +++ b/consensus/types/round_state.go @@ -65,25 +65,26 @@ func (rs RoundStepType) String() string { // NOTE: Not thread safe. Should only be manipulated by functions downstream // of the cs.receiveRoutine type RoundState struct { - Height int64 `json:"height"` // Height we are working on - Round int `json:"round"` - Step RoundStepType `json:"step"` - StartTime time.Time `json:"start_time"` - CommitTime time.Time `json:"commit_time"` // Subjective time when +2/3 precommits for Block at Round were found - Validators *types.ValidatorSet `json:"validators"` - Proposal *types.Proposal `json:"proposal"` - ProposalBlock *types.Block `json:"proposal_block"` - ProposalBlockParts *types.PartSet `json:"proposal_block_parts"` - LockedRound int `json:"locked_round"` - LockedBlock *types.Block `json:"locked_block"` - LockedBlockParts *types.PartSet `json:"locked_block_parts"` - ValidRound int `json:"valid_round"` // Last known round with POL for non-nil valid block. - ValidBlock *types.Block `json:"valid_block"` // Last known block of POL mentioned above. - ValidBlockParts *types.PartSet `json:"valid_block_parts"` // Last known block parts of POL metnioned above. - Votes *HeightVoteSet `json:"votes"` - CommitRound int `json:"commit_round"` // - LastCommit *types.VoteSet `json:"last_commit"` // Last precommits at Height-1 - LastValidators *types.ValidatorSet `json:"last_validators"` + Height int64 `json:"height"` // Height we are working on + Round int `json:"round"` + Step RoundStepType `json:"step"` + StartTime time.Time `json:"start_time"` + CommitTime time.Time `json:"commit_time"` // Subjective time when +2/3 precommits for Block at Round were found + Validators *types.ValidatorSet `json:"validators"` + Proposal *types.Proposal `json:"proposal"` + ProposalBlock *types.Block `json:"proposal_block"` + ProposalBlockParts *types.PartSet `json:"proposal_block_parts"` + LockedRound int `json:"locked_round"` + LockedBlock *types.Block `json:"locked_block"` + LockedBlockParts *types.PartSet `json:"locked_block_parts"` + ValidRound int `json:"valid_round"` // Last known round with POL for non-nil valid block. + ValidBlock *types.Block `json:"valid_block"` // Last known block of POL mentioned above. + ValidBlockParts *types.PartSet `json:"valid_block_parts"` // Last known block parts of POL metnioned above. + Votes *HeightVoteSet `json:"votes"` + CommitRound int `json:"commit_round"` // + LastCommit *types.VoteSet `json:"last_commit"` // Last precommits at Height-1 + LastValidators *types.ValidatorSet `json:"last_validators"` + TriggeredTimeoutPrecommit bool `json:"triggered_timeout_precommit"` } // Compressed version of the RoundState for use in RPC diff --git a/docs/architecture/adr-033-pubsub.md b/docs/architecture/adr-033-pubsub.md index 0ef0cae6..c52bf44a 100644 --- a/docs/architecture/adr-033-pubsub.md +++ b/docs/architecture/adr-033-pubsub.md @@ -5,6 +5,8 @@ Author: Anton Kaliaev (@melekes) ## Changelog 02-10-2018: Initial draft +16-01-2019: Second version based on our conversation with Jae +17-01-2019: Third version explaining how new design solves current issues ## Context @@ -40,7 +42,14 @@ goroutines can be used to avoid uncontrolled memory growth. In certain cases, this is what you want. But in our case, because we need strict ordering of events (if event A was published before B, the guaranteed -delivery order will be A -> B), we can't use goroutines. +delivery order will be A -> B), we can't publish msg in a new goroutine every time. + +We can also have a goroutine per subscriber, although we'd need to be careful +with the number of subscribers. It's more difficult to implement as well + +unclear if we'll benefit from it (cause we'd be forced to create N additional +channels to distribute msg to these goroutines). + +### Non-blocking send There is also a question whenever we should have a non-blocking send: @@ -56,15 +65,14 @@ for each subscriber { ``` This fixes the "slow client problem", but there is no way for a slow client to -know if it had missed a message. On the other hand, if we're going to stick -with blocking send, **devs must always ensure subscriber's handling code does not -block**. As you can see, there is an implicit choice between ordering guarantees -and using goroutines. +know if it had missed a message. We could return a second channel and close it +to indicate subscription termination. On the other hand, if we're going to +stick with blocking send, **devs must always ensure subscriber's handling code +does not block**, which is a hard task to put on their shoulders. The interim option is to run goroutines pool for a single message, wait for all goroutines to finish. This will solve "slow client problem", but we'd still have to wait `max(goroutine_X_time)` before we can publish the next message. -My opinion: not worth doing. ### Channels vs Callbacks @@ -76,8 +84,6 @@ memory leaks and/or memory usage increase. Go channels are de-facto standard for carrying data between goroutines. -**Question: Is it worth switching to callback functions?** - ### Why `Subscribe()` accepts an `out` channel? Because in our tests, we create buffered channels (cap: 1). Alternatively, we @@ -85,27 +91,89 @@ can make capacity an argument. ## Decision -Change Subscribe() function to return out channel: +Change Subscribe() function to return a `Subscription` struct: ```go -// outCap can be used to set capacity of out channel (unbuffered by default). -Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (out <-chan interface{}, err error) { +type Subscription struct { + // private fields +} + +func (s *Subscription) Out() <-chan MsgAndTags +func (s *Subscription) Cancelled() <-chan struct{} +func (s *Subscription) Err() error ``` -It's more idiomatic since we're closing it during Unsubscribe/UnsubscribeAll calls. +Out returns a channel onto which messages and tags are published. +Unsubscribe/UnsubscribeAll does not close the channel to avoid clients from +receiving a nil message. -Also, we should make tags available to subscribers: +Cancelled returns a channel that's closed when the subscription is terminated +and supposed to be used in a select statement. + +If Cancelled is not closed yet, Err() returns nil. +If Cancelled is closed, Err returns a non-nil error explaining why: +Unsubscribed if the subscriber choose to unsubscribe, +OutOfCapacity if the subscriber is not pulling messages fast enough and the Out channel become full. +After Err returns a non-nil error, successive calls to Err() return the same error. + +```go +subscription, err := pubsub.Subscribe(...) +if err != nil { + // ... +} +for { +select { + case msgAndTags <- subscription.Out(): + // ... + case <-subscription.Cancelled(): + return subscription.Err() +} +``` + +Make Out() channel buffered (cap: 1) by default. In most cases, we want to +terminate the slow subscriber. Only in rare cases, we want to block the pubsub +(e.g. when debugging consensus). This should lower the chances of the pubsub +being frozen. + +```go +// outCap can be used to set capacity of Out channel (1 by default). Set to 0 +for unbuffered channel (WARNING: it may block the pubsub). +Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (Subscription, error) { +``` + +Also, Out() channel should return tags along with a message: ```go type MsgAndTags struct { Msg interface{} Tags TagMap } - -// outCap can be used to set capacity of out channel (unbuffered by default). -Subscribe(ctx context.Context, clientID string, query Query, outCap... int) (out <-chan MsgAndTags, err error) { ``` +to inform clients of which Tags were used with Msg. + +### How this new design solves the current issues? + +https://github.com/tendermint/tendermint/issues/951 (https://github.com/tendermint/tendermint/issues/1880) + +Because of non-blocking send, situation where we'll deadlock is not possible +anymore. If the client stops reading messages, it will be removed. + +https://github.com/tendermint/tendermint/issues/1879 + +MsgAndTags is used now instead of a plain message. + +### Future problems and their possible solutions + +https://github.com/tendermint/tendermint/issues/2826 + +One question I am still pondering about: how to prevent pubsub from slowing +down consensus. We can increase the pubsub queue size (which is 0 now). Also, +it's probably a good idea to limit the total number of subscribers. + +This can be made automatically. Say we set queue size to 1000 and, when it's >= +80% full, refuse new subscriptions. + ## Status In review @@ -116,7 +184,10 @@ In review - more idiomatic interface - subscribers know what tags msg was published with +- subscribers aware of the reason their subscription was cancelled ### Negative +- (since v1) no concurrency when it comes to publishing messages + ### Neutral diff --git a/docs/networks/docker-compose.md b/docs/networks/docker-compose.md index b7818c3b..7e4adde8 100644 --- a/docs/networks/docker-compose.md +++ b/docs/networks/docker-compose.md @@ -78,6 +78,78 @@ cd $GOPATH/src/github.com/tendermint/tendermint rm -rf ./build/node* ``` +## Configuring abci containers + +To use your own abci applications with 4-node setup edit the [docker-compose.yaml](https://github.com/tendermint/tendermint/blob/develop/docker-compose.yml) file and add image to your abci application. + +``` + abci0: + container_name: abci0 + image: "abci-image" + build: + context: . + dockerfile: abci.Dockerfile + command: + networks: + localnet: + ipv4_address: 192.167.10.6 + + abci1: + container_name: abci1 + image: "abci-image" + build: + context: . + dockerfile: abci.Dockerfile + command: + networks: + localnet: + ipv4_address: 192.167.10.7 + + abci2: + container_name: abci2 + image: "abci-image" + build: + context: . + dockerfile: abci.Dockerfile + command: + networks: + localnet: + ipv4_address: 192.167.10.8 + + abci3: + container_name: abci3 + image: "abci-image" + build: + context: . + dockerfile: abci.Dockerfile + command: + networks: + localnet: + ipv4_address: 192.167.10.9 + +``` + +Override the [command](https://github.com/tendermint/tendermint/blob/master/networks/local/localnode/Dockerfile#L12) in each node to connect to it's abci. + +``` + node0: + container_name: node0 + image: "tendermint/localnode" + ports: + - "26656-26657:26656-26657" + environment: + - ID=0 + - LOG=$${LOG:-tendermint.log} + volumes: + - ./build:/tendermint:Z + command: node --proxy_app=tcp://abci0:26658 + networks: + localnet: + ipv4_address: 192.167.10.2 +``` + +Similarly do for node1, node2 and node3 then [run testnet](https://github.com/tendermint/tendermint/blob/master/docs/networks/docker-compose.md#run-a-testnet) + ## Logging Log is saved under the attached volume, in the `tendermint.log` file. If the diff --git a/lite/doc.go b/lite/doc.go index 429b096e..c02b5021 100644 --- a/lite/doc.go +++ b/lite/doc.go @@ -15,9 +15,7 @@ for you, so you can just build nice applications. We design for clients who have no strong trust relationship with any Tendermint node, just the blockchain and validator set as a whole. -# Data structures - -## SignedHeader +SignedHeader SignedHeader is a block header along with a commit -- enough validator precommit-vote signatures to prove its validity (> 2/3 of the voting power) @@ -42,7 +40,7 @@ The FullCommit is also declared in this package as a convenience structure, which includes the SignedHeader along with the full current and next ValidatorSets. -## Verifier +Verifier A Verifier validates a new SignedHeader given the currently known state. There are two different types of Verifiers provided. @@ -56,35 +54,32 @@ greater). DynamicVerifier - this Verifier implements an auto-update and persistence strategy to verify any SignedHeader of the blockchain. -## Provider and PersistentProvider +Provider and PersistentProvider A Provider allows us to store and retrieve the FullCommits. -```go -type Provider interface { - // LatestFullCommit returns the latest commit with - // minHeight <= height <= maxHeight. - // If maxHeight is zero, returns the latest where - // minHeight <= height. - LatestFullCommit(chainID string, minHeight, maxHeight int64) (FullCommit, error) -} -``` + type Provider interface { + // LatestFullCommit returns the latest commit with + // minHeight <= height <= maxHeight. + // If maxHeight is zero, returns the latest where + // minHeight <= height. + LatestFullCommit(chainID string, minHeight, maxHeight int64) (FullCommit, error) + } * client.NewHTTPProvider - query Tendermint rpc. A PersistentProvider is a Provider that also allows for saving state. This is used by the DynamicVerifier for persistence. -```go -type PersistentProvider interface { - Provider + type PersistentProvider interface { + Provider - // SaveFullCommit saves a FullCommit (without verification). - SaveFullCommit(fc FullCommit) error -} -``` + // SaveFullCommit saves a FullCommit (without verification). + SaveFullCommit(fc FullCommit) error + } * DBProvider - persistence provider for use with any libs/DB. + * MultiProvider - combine multiple providers. The suggested use for local light clients is client.NewHTTPProvider(...) for @@ -93,7 +88,7 @@ dbm.NewMemDB()), NewDBProvider("label", db.NewFileDB(...))) to store confirmed full commits (Trusted) -# How We Track Validators +How We Track Validators Unless you want to blindly trust the node you talk with, you need to trace every response back to a hash in a block header and validate the commit diff --git a/p2p/conn_set.go b/p2p/conn_set.go index f960c0e8..d6462278 100644 --- a/p2p/conn_set.go +++ b/p2p/conn_set.go @@ -11,6 +11,7 @@ type ConnSet interface { HasIP(net.IP) bool Set(net.Conn, []net.IP) Remove(net.Conn) + RemoveAddr(net.Addr) } type connSetItem struct { @@ -62,6 +63,13 @@ func (cs *connSet) Remove(c net.Conn) { delete(cs.conns, c.RemoteAddr().String()) } +func (cs *connSet) RemoveAddr(addr net.Addr) { + cs.Lock() + defer cs.Unlock() + + delete(cs.conns, addr.String()) +} + func (cs *connSet) Set(c net.Conn, ips []net.IP) { cs.Lock() defer cs.Unlock() diff --git a/p2p/dummy/peer.go b/p2p/dummy/peer.go index 71def27e..57edafc6 100644 --- a/p2p/dummy/peer.go +++ b/p2p/dummy/peer.go @@ -55,6 +55,16 @@ func (p *peer) RemoteIP() net.IP { return net.ParseIP("127.0.0.1") } +// Addr always returns tcp://localhost:8800. +func (p *peer) RemoteAddr() net.Addr { + return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8800} +} + +// CloseConn always returns nil. +func (p *peer) CloseConn() error { + return nil +} + // Status always returns empry connection status. func (p *peer) Status() tmconn.ConnectionStatus { return tmconn.ConnectionStatus{} diff --git a/p2p/peer.go b/p2p/peer.go index da301d49..73332a2a 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -18,15 +18,18 @@ type Peer interface { cmn.Service FlushStop() - ID() ID // peer's cryptographic ID - RemoteIP() net.IP // remote IP of the connection + ID() ID // peer's cryptographic ID + RemoteIP() net.IP // remote IP of the connection + RemoteAddr() net.Addr // remote address of the connection IsOutbound() bool // did we dial the peer IsPersistent() bool // do we redial this peer when we disconnect + CloseConn() error // close original connection + NodeInfo() NodeInfo // peer's info Status() tmconn.ConnectionStatus - OriginalAddr() *NetAddress + OriginalAddr() *NetAddress // original address for outbound peers Send(byte, []byte) bool TrySend(byte, []byte) bool @@ -296,6 +299,11 @@ func (p *peer) hasChannel(chID byte) bool { return false } +// CloseConn closes original connection. Used for cleaning up in cases where the peer had not been started at all. +func (p *peer) CloseConn() error { + return p.peerConn.conn.Close() +} + //--------------------------------------------------- // methods only used for testing // TODO: can we remove these? @@ -305,8 +313,8 @@ func (pc *peerConn) CloseConn() { pc.conn.Close() // nolint: errcheck } -// Addr returns peer's remote network address. -func (p *peer) Addr() net.Addr { +// RemoteAddr returns peer's remote network address. +func (p *peer) RemoteAddr() net.Addr { return p.peerConn.conn.RemoteAddr() } diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index 3eb5357d..1d2372fb 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -30,6 +30,8 @@ func (mp *mockPeer) Get(s string) interface{} { return s } func (mp *mockPeer) Set(string, interface{}) {} func (mp *mockPeer) RemoteIP() net.IP { return mp.ip } func (mp *mockPeer) OriginalAddr() *NetAddress { return nil } +func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } +func (mp *mockPeer) CloseConn() error { return nil } // Returns a mock peer func newMockPeer(ip net.IP) *mockPeer { diff --git a/p2p/peer_test.go b/p2p/peer_test.go index e53d6013..90be3113 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -39,7 +39,7 @@ func TestPeerBasic(t *testing.T) { assert.False(p.IsPersistent()) p.persistent = true assert.True(p.IsPersistent()) - assert.Equal(rp.Addr().DialString(), p.Addr().String()) + assert.Equal(rp.Addr().DialString(), p.RemoteAddr().String()) assert.Equal(rp.ID(), p.ID()) } @@ -137,9 +137,9 @@ type remotePeer struct { PrivKey crypto.PrivKey Config *config.P2PConfig addr *NetAddress - quit chan struct{} channels cmn.HexBytes listenAddr string + listener net.Listener } func (rp *remotePeer) Addr() *NetAddress { @@ -159,25 +159,45 @@ func (rp *remotePeer) Start() { if e != nil { golog.Fatalf("net.Listen tcp :0: %+v", e) } + rp.listener = l rp.addr = NewNetAddress(PubKeyToID(rp.PrivKey.PubKey()), l.Addr()) - rp.quit = make(chan struct{}) if rp.channels == nil { rp.channels = []byte{testCh} } - go rp.accept(l) + go rp.accept() } func (rp *remotePeer) Stop() { - close(rp.quit) + rp.listener.Close() } -func (rp *remotePeer) accept(l net.Listener) { +func (rp *remotePeer) Dial(addr *NetAddress) (net.Conn, error) { + conn, err := addr.DialTimeout(1 * time.Second) + if err != nil { + return nil, err + } + pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey) + if err != nil { + return nil, err + } + _, err = handshake(pc.conn, time.Second, rp.nodeInfo()) + if err != nil { + return nil, err + } + return conn, err +} + +func (rp *remotePeer) accept() { conns := []net.Conn{} for { - conn, err := l.Accept() + conn, err := rp.listener.Accept() if err != nil { - golog.Fatalf("Failed to accept conn: %+v", err) + golog.Printf("Failed to accept conn: %+v", err) + for _, conn := range conns { + _ = conn.Close() + } + return } pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey) @@ -185,31 +205,20 @@ func (rp *remotePeer) accept(l net.Listener) { golog.Fatalf("Failed to create a peer: %+v", err) } - _, err = handshake(pc.conn, time.Second, rp.nodeInfo(l)) + _, err = handshake(pc.conn, time.Second, rp.nodeInfo()) if err != nil { golog.Fatalf("Failed to perform handshake: %+v", err) } conns = append(conns, conn) - - select { - case <-rp.quit: - for _, conn := range conns { - if err := conn.Close(); err != nil { - golog.Fatal(err) - } - } - return - default: - } } } -func (rp *remotePeer) nodeInfo(l net.Listener) NodeInfo { +func (rp *remotePeer) nodeInfo() NodeInfo { return DefaultNodeInfo{ ProtocolVersion: defaultProtocolVersion, ID_: rp.Addr().ID, - ListenAddr: l.Addr().String(), + ListenAddr: rp.listener.Addr().String(), Network: "testing", Version: "1.2.3-rc0-deadbeef", Channels: rp.channels, diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 057aadaa..0b043ca8 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -471,7 +471,11 @@ func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) { attempts, lastDialed := r.dialAttemptsInfo(addr) if attempts > maxAttemptsToDial { - r.Logger.Error("Reached max attempts to dial", "addr", addr, "attempts", attempts) + // Do not log the message if the addr gets readded. + if attempts+1 == maxAttemptsToDial { + r.Logger.Info("Reached max attempts to dial", "addr", addr, "attempts", attempts) + r.attemptsToDial.Store(addr.DialString(), _attemptsToDial{attempts + 1, time.Now()}) + } r.book.MarkBad(addr) return } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 2e2f3f24..f5125c60 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -404,6 +404,8 @@ func (mockPeer) TrySend(byte, []byte) bool { return false } func (mockPeer) Set(string, interface{}) {} func (mockPeer) Get(string) interface{} { return nil } func (mockPeer) OriginalAddr() *p2p.NetAddress { return nil } +func (mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8800} } +func (mockPeer) CloseConn() error { return nil } func assertPeersWithTimeout( t *testing.T, diff --git a/p2p/switch.go b/p2p/switch.go index 0490eebb..dbd9c2a6 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -210,6 +210,7 @@ func (sw *Switch) OnStart() error { func (sw *Switch) OnStop() { // Stop peers for _, p := range sw.peers.List() { + sw.transport.Cleanup(p) p.Stop() if sw.peers.Remove(p) { sw.metrics.Peers.Add(float64(-1)) @@ -304,6 +305,7 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { if sw.peers.Remove(peer) { sw.metrics.Peers.Add(float64(-1)) } + sw.transport.Cleanup(peer) peer.Stop() for _, reactor := range sw.reactors { reactor.RemovePeer(peer, reason) @@ -529,13 +531,16 @@ func (sw *Switch) acceptRoutine() { "max", sw.config.MaxNumInboundPeers, ) - _ = p.Stop() + sw.transport.Cleanup(p) continue } if err := sw.addPeer(p); err != nil { - _ = p.Stop() + sw.transport.Cleanup(p) + if p.IsRunning() { + _ = p.Stop() + } sw.Logger.Info( "Ignoring inbound connection: error while adding peer", "err", err, @@ -593,7 +598,10 @@ func (sw *Switch) addOutboundPeerWithConfig( } if err := sw.addPeer(p); err != nil { - _ = p.Stop() + sw.transport.Cleanup(p) + if p.IsRunning() { + _ = p.Stop() + } return err } @@ -628,7 +636,8 @@ func (sw *Switch) filterPeer(p Peer) error { return nil } -// addPeer starts up the Peer and adds it to the Switch. +// addPeer starts up the Peer and adds it to the Switch. Error is returned if +// the peer is filtered out or failed to start or can't be added. func (sw *Switch) addPeer(p Peer) error { if err := sw.filterPeer(p); err != nil { return err @@ -636,11 +645,15 @@ func (sw *Switch) addPeer(p Peer) error { p.SetLogger(sw.Logger.With("peer", p.NodeInfo().NetAddress())) - // All good. Start peer + // Handle the shut down case where the switch has stopped but we're + // concurrently trying to add a peer. if sw.IsRunning() { + // All good. Start peer if err := sw.startInitPeer(p); err != nil { return err } + } else { + sw.Logger.Error("Won't start a peer - switch is not running", "peer", p) } // Add the peer to .peers. diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 6c515be0..35866161 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -3,7 +3,9 @@ package p2p import ( "bytes" "fmt" + "io" "io/ioutil" + "net" "net/http" "net/http/httptest" "regexp" @@ -13,7 +15,6 @@ import ( "time" stdprometheus "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -477,6 +478,58 @@ func TestSwitchFullConnectivity(t *testing.T) { } } +func TestSwitchAcceptRoutine(t *testing.T) { + cfg.MaxNumInboundPeers = 5 + + // make switch + sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc) + err := sw.Start() + require.NoError(t, err) + defer sw.Stop() + + remotePeers := make([]*remotePeer, 0) + assert.Equal(t, 0, sw.Peers().Size()) + + // 1. check we connect up to MaxNumInboundPeers + for i := 0; i < cfg.MaxNumInboundPeers; i++ { + rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + remotePeers = append(remotePeers, rp) + rp.Start() + c, err := rp.Dial(sw.NodeInfo().NetAddress()) + require.NoError(t, err) + // spawn a reading routine to prevent connection from closing + go func(c net.Conn) { + for { + one := make([]byte, 1) + _, err := c.Read(one) + if err != nil { + return + } + } + }(c) + } + time.Sleep(10 * time.Millisecond) + assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size()) + + // 2. check we close new connections if we already have MaxNumInboundPeers peers + rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg} + rp.Start() + conn, err := rp.Dial(sw.NodeInfo().NetAddress()) + require.NoError(t, err) + // check conn is closed + one := make([]byte, 1) + conn.SetReadDeadline(time.Now().Add(10 * time.Millisecond)) + _, err = conn.Read(one) + assert.Equal(t, io.EOF, err) + assert.Equal(t, cfg.MaxNumInboundPeers, sw.Peers().Size()) + rp.Stop() + + // stop remote peers + for _, rp := range remotePeers { + rp.Stop() + } +} + func BenchmarkSwitchBroadcast(b *testing.B) { s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch { // Make bar reactors of bar channels each diff --git a/p2p/test_util.go b/p2p/test_util.go index ea788b79..04629fca 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -247,17 +247,35 @@ func testNodeInfo(id ID, name string) NodeInfo { } func testNodeInfoWithNetwork(id ID, name, network string) NodeInfo { + port, err := getFreePort() + if err != nil { + panic(err) + } return DefaultNodeInfo{ ProtocolVersion: defaultProtocolVersion, ID_: id, - ListenAddr: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023), + ListenAddr: fmt.Sprintf("127.0.0.1:%d", port), Network: network, Version: "1.2.3-rc0-deadbeef", Channels: []byte{testCh}, Moniker: name, Other: DefaultNodeInfoOther{ TxIndex: "on", - RPCAddress: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023), + RPCAddress: fmt.Sprintf("127.0.0.1:%d", port), }, } } + +func getFreePort() (int, error) { + addr, err := net.ResolveTCPAddr("tcp", "localhost:0") + if err != nil { + return 0, err + } + + l, err := net.ListenTCP("tcp", addr) + if err != nil { + return 0, err + } + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil +} diff --git a/p2p/transport.go b/p2p/transport.go index 69fab312..2d4420a1 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -52,6 +52,9 @@ type Transport interface { // Dial connects to the Peer for the address. Dial(NetAddress, peerConfig) (Peer, error) + + // Cleanup any resources associated with Peer. + Cleanup(Peer) } // transportLifecycle bundles the methods for callers to control start and stop @@ -274,6 +277,13 @@ func (mt *MultiplexTransport) acceptPeers() { } } +// Cleanup removes the given address from the connections set and +// closes the connection. +func (mt *MultiplexTransport) Cleanup(peer Peer) { + mt.conns.RemoveAddr(peer.RemoteAddr()) + _ = peer.CloseConn() +} + func (mt *MultiplexTransport) cleanup(c net.Conn) error { mt.conns.Remove(c) @@ -418,12 +428,6 @@ func (mt *MultiplexTransport) wrapPeer( PeerMetrics(cfg.metrics), ) - // Wait for Peer to Stop so we can cleanup. - go func(c net.Conn) { - <-p.Quit() - _ = mt.cleanup(c) - }(c) - return p } diff --git a/version/version.go b/version/version.go index 87d81c6f..9d05afe6 100644 --- a/version/version.go +++ b/version/version.go @@ -18,7 +18,7 @@ const ( // TMCoreSemVer is the current version of Tendermint Core. // It's the Semantic Version of the software. // Must be a string because scripts like dist.sh read this file. - TMCoreSemVer = "0.29.0" + TMCoreSemVer = "0.29.1" // ABCISemVer is the semantic version of the ABCI library ABCISemVer = "0.15.0"