mirror of
https://github.com/fluencelabs/tendermint
synced 2025-07-15 20:41:37 +00:00
Compare commits
3 Commits
v0.26.0-de
...
v0.19.6-rc
Author | SHA1 | Date | |
---|---|---|---|
|
667fbaee23 | ||
|
8e1e2bd10a | ||
|
9671c0d4fe |
@@ -2,13 +2,18 @@
|
||||
|
||||
## 0.19.6
|
||||
|
||||
*TBD*
|
||||
|
||||
IMPROVEMENTS:
|
||||
|
||||
- [consensus] consensus reactor now receives events from a separate event bus,
|
||||
- [consensus] Consensus reactor now receives events from a separate synchronous event bus,
|
||||
which is not dependant on external RPC load
|
||||
|
||||
BUG FIX:
|
||||
|
||||
- [evidence] Dont send peers evidence from heights they haven't synced to yet
|
||||
- [p2p] Refuse connections to more than one peer with the same IP
|
||||
- [docs] Various fixes
|
||||
|
||||
## 0.19.5
|
||||
|
||||
*May 20th, 2018*
|
||||
|
@@ -133,7 +133,7 @@ func (pool *BlockPool) removeTimedoutPeers() {
|
||||
// curRate can be 0 on start
|
||||
if curRate != 0 && curRate < minRecvRate {
|
||||
err := errors.New("peer is not sending us data fast enough")
|
||||
pool.sendError(err, peer.id)
|
||||
go pool.sendError(err, peer.id)
|
||||
pool.Logger.Error("SendTimeout", "peer", peer.id,
|
||||
"reason", err,
|
||||
"curRate", fmt.Sprintf("%d KB/s", curRate/1024),
|
||||
@@ -232,14 +232,16 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
|
||||
defer pool.mtx.Unlock()
|
||||
|
||||
requester := pool.requesters[block.Height]
|
||||
poolHeight := pool.height
|
||||
|
||||
if requester == nil {
|
||||
pool.Logger.Info("peer sent us a block we didn't expect", "peer", peerID, "curHeight", pool.height, "blockHeight", block.Height)
|
||||
diff := pool.height - block.Height
|
||||
pool.Logger.Info("peer sent us a block we didn't expect", "peer", peerID, "curHeight", poolHeight, "blockHeight", block.Height)
|
||||
diff := poolHeight - block.Height
|
||||
if diff < 0 {
|
||||
diff *= -1
|
||||
}
|
||||
if diff > maxDiffBetweenCurrentAndReceivedBlockHeight {
|
||||
pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID)
|
||||
go pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID)
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -247,6 +249,7 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
|
||||
if requester.setBlock(block, peerID) {
|
||||
pool.numPending--
|
||||
peer := pool.peers[peerID]
|
||||
|
||||
if peer != nil {
|
||||
peer.decrPending(blockSize)
|
||||
}
|
||||
@@ -292,7 +295,8 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) {
|
||||
for _, requester := range pool.requesters {
|
||||
if requester.getPeerID() == peerID {
|
||||
if requester.getBlock() != nil {
|
||||
pool.numPending++
|
||||
continue
|
||||
// pool.numPending++
|
||||
}
|
||||
go requester.redo() // pick another peer and ...
|
||||
}
|
||||
@@ -443,7 +447,7 @@ func (peer *bpPeer) onTimeout() {
|
||||
defer peer.pool.mtx.Unlock()
|
||||
|
||||
err := errors.New("peer did not send us anything")
|
||||
peer.pool.sendError(err, peer.id)
|
||||
go peer.pool.sendError(err, peer.id)
|
||||
peer.logger.Error("SendTimeout", "reason", err, "timeout", peerTimeout)
|
||||
peer.didTimeout = true
|
||||
}
|
||||
@@ -517,7 +521,14 @@ func (bpr *bpRequester) reset() {
|
||||
// Tells bpRequester to pick another peer and try again.
|
||||
// NOTE: blocking
|
||||
func (bpr *bpRequester) redo() {
|
||||
bpr.redoCh <- struct{}{}
|
||||
select {
|
||||
case bpr.redoCh <- struct{}{}:
|
||||
case <-bpr.Quit():
|
||||
return
|
||||
case <-bpr.pool.Quit():
|
||||
bpr.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Responsible for making more requests as necessary
|
||||
@@ -556,17 +567,8 @@ OUTER_LOOP:
|
||||
bpr.reset()
|
||||
continue OUTER_LOOP // When peer is removed
|
||||
case <-bpr.gotBlockCh:
|
||||
// We got the block, now see if it's good.
|
||||
select {
|
||||
case <-bpr.pool.Quit():
|
||||
bpr.Stop()
|
||||
return
|
||||
case <-bpr.Quit():
|
||||
return
|
||||
case <-bpr.redoCh:
|
||||
bpr.reset()
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
bpr.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/tendermint/go-amino"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
@@ -118,21 +119,48 @@ func (evR *EvidenceReactor) broadcastRoutine() {
|
||||
case evidence := <-evR.evpool.EvidenceChan():
|
||||
// broadcast some new evidence
|
||||
msg := &EvidenceListMessage{[]types.Evidence{evidence}}
|
||||
evR.Switch.Broadcast(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
|
||||
evR.broadcastEvidenceListMsg(msg)
|
||||
|
||||
// TODO: Broadcast runs asynchronously, so this should wait on the successChan
|
||||
// in another routine before marking to be proper.
|
||||
// TODO: the broadcast here is just doing TrySend.
|
||||
// We should make sure the send succeeds before marking broadcasted.
|
||||
evR.evpool.evidenceStore.MarkEvidenceAsBroadcasted(evidence)
|
||||
case <-ticker.C:
|
||||
// broadcast all pending evidence
|
||||
msg := &EvidenceListMessage{evR.evpool.PendingEvidence()}
|
||||
evR.Switch.Broadcast(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
|
||||
evR.broadcastEvidenceListMsg(msg)
|
||||
case <-evR.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (evR *EvidenceReactor) broadcastEvidenceListMsg(msg *EvidenceListMessage) {
|
||||
// NOTE: we dont send evidence to peers higher than their height,
|
||||
// because they can't validate it (don't have validators from the height).
|
||||
// So, for now, only send the `msg` to peers synced to the highest height in the list.
|
||||
// TODO: send each peer all the evidence below its current height -
|
||||
// might require a routine per peer, like the mempool.
|
||||
|
||||
var maxHeight int64
|
||||
for _, ev := range msg.Evidence {
|
||||
if ev.Height() > maxHeight {
|
||||
maxHeight = ev.Height()
|
||||
}
|
||||
}
|
||||
|
||||
for _, peer := range evR.Switch.Peers().List() {
|
||||
ps := peer.Get(types.PeerStateKey).(PeerState)
|
||||
rs := ps.GetRoundState()
|
||||
if rs.Height >= maxHeight {
|
||||
peer.TrySend(EvidenceChannel, cdc.MustMarshalBinaryBare(msg))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type PeerState interface {
|
||||
GetRoundState() *cstypes.PeerRoundState
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Messages
|
||||
|
||||
|
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
"github.com/tendermint/tendermint/p2p"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
@@ -130,8 +131,50 @@ func TestReactorBroadcastEvidence(t *testing.T) {
|
||||
// make reactors from statedb
|
||||
reactors := makeAndConnectEvidenceReactors(config, stateDBs)
|
||||
|
||||
// set the peer height on each reactor
|
||||
for _, r := range reactors {
|
||||
for _, peer := range r.Switch.Peers().List() {
|
||||
ps := peerState{height}
|
||||
peer.Set(types.PeerStateKey, ps)
|
||||
}
|
||||
}
|
||||
|
||||
// send a bunch of valid evidence to the first reactor's evpool
|
||||
// and wait for them all to be received in the others
|
||||
evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE)
|
||||
waitForEvidence(t, evList, reactors)
|
||||
}
|
||||
|
||||
type peerState struct {
|
||||
height int64
|
||||
}
|
||||
|
||||
func (ps peerState) GetRoundState() *cstypes.PeerRoundState {
|
||||
return &cstypes.PeerRoundState{
|
||||
Height: ps.height,
|
||||
}
|
||||
}
|
||||
|
||||
func TestReactorSelectiveBroadcast(t *testing.T) {
|
||||
config := cfg.TestConfig()
|
||||
|
||||
valAddr := []byte("myval")
|
||||
height1 := int64(NUM_EVIDENCE) + 10
|
||||
height2 := int64(NUM_EVIDENCE) / 2
|
||||
|
||||
// DB1 is ahead of DB2
|
||||
stateDB1 := initializeValidatorState(valAddr, height1)
|
||||
stateDB2 := initializeValidatorState(valAddr, height2)
|
||||
|
||||
// make reactors from statedb
|
||||
reactors := makeAndConnectEvidenceReactors(config, []dbm.DB{stateDB1, stateDB2})
|
||||
peer := reactors[0].Switch.Peers().List()[0]
|
||||
ps := peerState{height2}
|
||||
peer.Set(types.PeerStateKey, ps)
|
||||
|
||||
// send a bunch of valid evidence to the first reactor's evpool
|
||||
evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE)
|
||||
|
||||
// only ones less than the peers height should make it through
|
||||
waitForEvidence(t, evList[:NUM_EVIDENCE/2], reactors[1:2])
|
||||
}
|
||||
|
@@ -164,7 +164,7 @@ func (store *EvidenceStore) MarkEvidenceAsBroadcasted(evidence types.Evidence) {
|
||||
store.db.Delete(key)
|
||||
}
|
||||
|
||||
// MarkEvidenceAsPending removes evidence from pending and outqueue and sets the state to committed.
|
||||
// MarkEvidenceAsCommitted removes evidence from pending and outqueue and sets the state to committed.
|
||||
func (store *EvidenceStore) MarkEvidenceAsCommitted(evidence types.Evidence) {
|
||||
// if its committed, its been broadcast
|
||||
store.MarkEvidenceAsBroadcasted(evidence)
|
||||
|
@@ -106,7 +106,7 @@ func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block
|
||||
|
||||
fail.Fail() // XXX
|
||||
|
||||
// Update evpool now that state is saved
|
||||
// Update evpool now that state is saved.
|
||||
// TODO: handle the crash/recover scenario
|
||||
// ie. (may need to call Update for last block)
|
||||
blockExec.evpool.Update(block)
|
||||
|
Reference in New Issue
Block a user