added pool tests

This commit is contained in:
Anca Zamfir
2019-03-27 19:54:08 +01:00
parent 6b109881c6
commit 70c703860e
6 changed files with 585 additions and 63 deletions

View File

@ -190,7 +190,7 @@ func TestBadBlockStopsPeer(t *testing.T) {
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(148)
maxBlockHeight := int64(500)
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
defer func() {
@ -293,7 +293,7 @@ func setupReactors(
func TestFastSyncMultiNode(t *testing.T) {
numNodes := 8
maxHeight := int64(2000)
maxHeight := int64(1000)
config = cfg.ResetTestRoot("blockchain_reactor_test")
genDoc, privVals := randGenesisDoc(1, false, 30)

View File

@ -127,3 +127,7 @@ func (peer *bpPeer) cleanup() {
peer.timeout.Stop()
}
}
func (peer *bpPeer) String() string {
return fmt.Sprintf("peer: %v height: %v pending: %v", peer.id, peer.height, peer.numPending)
}

View File

@ -41,11 +41,29 @@ func newBlockPool(height int64) *blockPool {
}
}
func (pool *blockPool) String() string {
peerStr := fmt.Sprintf("Pool Peers:")
for _, p := range pool.peers {
peerStr += fmt.Sprintf("%v,", p)
}
return peerStr
}
func (pool *blockPool) setLogger(l log.Logger) {
pool.logger = l
}
// Sets the peer's blockchain height.
func (pool blockPool) getMaxPeerHeight() int64 {
return pool.maxPeerHeight
}
func (pool *blockPool) reachedMaxHeight() bool {
return pool.height >= pool.maxPeerHeight
}
// Adds a new peer or updates an existing peer with a new height.
// If the peer is too short it is removed
// Should change function name??
func (pool *blockPool) updatePeer(peerID p2p.ID, height int64, errFunc func(err error, peerID p2p.ID)) error {
peer := pool.peers[peerID]
@ -74,52 +92,54 @@ func (pool *blockPool) updatePeer(peerID p2p.ID, height int64, errFunc func(err
}
}
oldH := peer.height
pool.logger.Info("setting peer height to", "peer", peerID, "height", height)
peer.height = height
if height > pool.maxPeerHeight {
pool.maxPeerHeight = height
if oldH == pool.maxPeerHeight && height < pool.maxPeerHeight {
// peer was at max height, update max if not other high peers
pool.updateMaxPeerHeight()
}
if height > pool.maxPeerHeight {
// peer increased height over maxPeerHeight
pool.maxPeerHeight = height
pool.logger.Info("setting maxPeerHeight", "max", pool.maxPeerHeight)
}
return nil
}
func (pool blockPool) getMaxPeerHeight() int64 {
return pool.maxPeerHeight
}
// called from:
// - the switch from its go routing
// - when peer times out from the timer go routine.
// Send message to FSM
func (fsm *bReactorFSM) processPeerError(err error, peerID p2p.ID) {
msgData := bReactorMessageData{
event: peerErrEv,
data: bReactorEventData{
err: err,
peerId: peerID,
},
}
sendMessageToFSM(fsm, msgData)
}
func (pool *blockPool) reachedMaxHeight() bool {
return pool.height >= pool.maxPeerHeight
}
// called every time FSM advances its height
func (pool *blockPool) removeShortPeers() {
// If no peers are left, maxPeerHeight is set to 0.
func (pool *blockPool) updateMaxPeerHeight() {
var max int64
for _, peer := range pool.peers {
if peer.height < pool.height {
pool.logger.Info("removeShortPeers", "peer", peer.id)
pool.removePeer(peer.id, nil)
if peer.height > max {
max = peer.height
}
}
pool.maxPeerHeight = max
}
// stops the peer timer and deletes the peer
// Stops the peer timer and deletes the peer. Recomputes the max peer height
func (pool *blockPool) deletePeer(peerID p2p.ID) {
if p, exist := pool.peers[peerID]; exist && p.timeout != nil {
p, exist := pool.peers[peerID]
if !exist {
return
}
if p.timeout != nil {
p.timeout.Stop()
}
delete(pool.peers, peerID)
if p.height == pool.maxPeerHeight {
pool.updateMaxPeerHeight()
}
}
// removes any blocks and requests associated with the peer, deletes the peer and informs the switch if needed.
@ -135,12 +155,14 @@ func (pool *blockPool) removePeer(peerID p2p.ID, err error) {
}
// delete peer
pool.deletePeer(peerID)
}
// recompute maxPeerHeight
pool.maxPeerHeight = 0
// called every time FSM advances its height
func (pool *blockPool) removeShortPeers() {
for _, peer := range pool.peers {
if peer.height > pool.maxPeerHeight {
pool.maxPeerHeight = peer.height
if peer.height < pool.height {
pool.logger.Info("removeShortPeers", "peer", peer.id)
pool.removePeer(peer.id, nil)
}
}
}
@ -230,7 +252,7 @@ func (pool *blockPool) sendRequestBatch(sendFunc func(peerID p2p.ID, height int6
// request height
height := pool.height + int64(i)
if height > pool.maxPeerHeight {
pool.logger.Debug("Will not send request for", "height", height)
pool.logger.Debug("Will not send request for", "height", height, "max", pool.maxPeerHeight)
return err
}
req := pool.blocks[height]
@ -241,7 +263,21 @@ func (pool *blockPool) sendRequestBatch(sendFunc func(peerID p2p.ID, height int6
// couldn't find a good peer or couldn't communicate with it
continue
}
_ = sendFunc(peerId, height)
pool.logger.Debug("Try to send request to peer", "peer", peerId, "height", height)
err = sendFunc(peerId, height)
if err == errSendQueueFull {
pool.logger.Error("cannot send request, queue full", "peer", peerId, "height", height)
continue
}
if err == errNilPeerForBlockRequest {
// this peer does not exist in the switch, delete locally
pool.logger.Error("peer doesn't exist in the switch", "peer", peerId)
pool.removePeer(peerId, errNilPeerForBlockRequest)
continue
}
pool.logger.Debug("Sent request to peer", "peer", peerId, "height", height)
}
}
return nil
@ -250,31 +286,21 @@ func (pool *blockPool) sendRequestBatch(sendFunc func(peerID p2p.ID, height int6
func (pool *blockPool) getBestPeer(height int64) (peerId p2p.ID, err error) {
// make requests
// TODO - sort peers in order of goodness
pool.logger.Debug("try to send request for", "height", height)
pool.logger.Debug("try to find peer for", "height", height)
for _, peer := range pool.peers {
// Send Block Request message to peer
if peer.height < height {
continue
}
pool.logger.Debug("Try to send request to peer", "peer", peer.id, "height", height)
if err == errSendQueueFull {
pool.logger.Error("cannot send request, queue full", "peer", peer.id, "height", height)
if peer.numPending > int32(maxRequestBatchSize/len(pool.peers)) {
continue
}
if err == errNilPeerForBlockRequest {
// this peer does not exist in the switch, delete locally
pool.logger.Error("peer doesn't exist in the switch", "peer", peer.id)
pool.deletePeer(peer.id)
continue
}
pool.logger.Debug("Sent request to peer", "peer", peer.id, "height", height)
// reserve space for block
pool.blocks[height] = &blockData{peerId: peer.id, block: nil}
pool.peers[peer.id].incrPending()
return peer.id, nil
}
pool.logger.Debug("List of peers", "peers", pool.peers)
return "", errNoPeerFoundForRequest
}

466
blockchain_new/pool_test.go Normal file
View File

@ -0,0 +1,466 @@
package blockchain_new
import (
"github.com/stretchr/testify/assert"
"reflect"
"testing"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
type fields struct {
logger log.Logger
peers map[p2p.ID]*bpPeer
blocks map[int64]*blockData
height int64
maxPeerHeight int64
}
type testPeer struct {
id p2p.ID
height int64
}
func testErrFunc(err error, peerID p2p.ID) {
}
func makeUpdateFields(log log.Logger, height int64, peers []testPeer, generateBlocks bool) fields {
ufields := fields{
logger: log,
height: height,
peers: make(map[p2p.ID]*bpPeer),
blocks: make(map[int64]*blockData),
}
var maxH int64
for _, p := range peers {
if p.height > maxH {
maxH = p.height
}
ufields.peers[p.id] = newBPPeer(p.id, p.height, testErrFunc)
}
ufields.maxPeerHeight = maxH
return ufields
}
func poolCopy(pool *blockPool) *blockPool {
return &blockPool{
peers: peersCopy(pool.peers),
logger: pool.logger,
blocks: blocksCopy(pool.blocks),
height: pool.height,
maxPeerHeight: pool.maxPeerHeight,
}
}
func blocksCopy(blocks map[int64]*blockData) map[int64]*blockData {
blockCopy := make(map[int64]*blockData)
for _, b := range blocks {
blockCopy[b.block.Height] = &blockData{peerId: b.peerId, block: b.block}
}
return blockCopy
}
func peersCopy(peers map[p2p.ID]*bpPeer) map[p2p.ID]*bpPeer {
peerCopy := make(map[p2p.ID]*bpPeer)
for _, p := range peers {
peerCopy[p.id] = newBPPeer(p.id, p.height, p.errFunc)
}
return peerCopy
}
func TestBlockPoolUpdatePeer(t *testing.T) {
l := log.TestingLogger()
type args struct {
peerID p2p.ID
height int64
errFunc func(err error, peerID p2p.ID)
}
tests := []struct {
name string
fields fields
args args
errWanted error
addWanted bool
delWanted bool
maxHeightWanted int64
}{
{
name: "add a first short peer",
fields: makeUpdateFields(l, 100, []testPeer{}, false),
args: args{"P1", 50, func(err error, peerId p2p.ID) {}},
errWanted: errPeerTooShort,
maxHeightWanted: int64(0),
},
{
name: "add a first good peer",
fields: makeUpdateFields(l, 100, []testPeer{}, false),
args: args{"P1", 101, func(err error, peerId p2p.ID) {}},
addWanted: true,
maxHeightWanted: int64(101),
},
{
name: "increase the height of P1 from 120 to 123",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false),
args: args{"P1", 123, func(err error, peerId p2p.ID) {}},
maxHeightWanted: int64(123),
},
{
name: "decrease the height of P1 from 120 to 110",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false),
args: args{"P1", 110, func(err error, peerId p2p.ID) {}},
maxHeightWanted: int64(110),
},
{
name: "decrease the height of P1 from 120 to 90",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false),
args: args{"P1", 90, func(err error, peerId p2p.ID) {}},
delWanted: true,
errWanted: errPeerTooShort,
maxHeightWanted: int64(0),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pool := &blockPool{
logger: tt.fields.logger,
peers: tt.fields.peers,
blocks: tt.fields.blocks,
height: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
}
beforePool := poolCopy(pool)
err := pool.updatePeer(tt.args.peerID, tt.args.height, tt.args.errFunc)
if err != tt.errWanted {
t.Errorf("blockPool.updatePeer() error = %v, wantErr %v", err, tt.errWanted)
}
if tt.errWanted != nil {
// error case
if tt.delWanted {
assert.Equal(t, len(beforePool.peers)-1, len(pool.peers))
return
}
assert.Equal(t, beforePool, pool)
return
}
if tt.addWanted {
// add case only
assert.Equal(t, len(beforePool.peers)+1, len(pool.peers))
} else {
// update case only
assert.Equal(t, len(beforePool.peers), len(pool.peers))
}
// both add and update
assert.Equal(t, pool.peers[tt.args.peerID].height, tt.args.height)
assert.Equal(t, tt.maxHeightWanted, pool.maxPeerHeight)
})
}
}
func TestBlockPoolRemovePeer(t *testing.T) {
type args struct {
peerID p2p.ID
err error
}
l := log.TestingLogger()
tests := []struct {
name string
fields fields
args args
maxHeightWanted int64
}{
{
name: "attempt to delete non-existing peer",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false),
args: args{"P99", nil},
maxHeightWanted: int64(120),
},
{
name: "delete the only peer",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 120}}, false),
args: args{"P1", nil},
maxHeightWanted: int64(0),
},
{
name: "delete shorter of two peers",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 100}, {"P2", 120}}, false),
args: args{"P1", nil},
maxHeightWanted: int64(120),
},
{
name: "delete taller of two peers",
fields: makeUpdateFields(l, 100, []testPeer{{"P1", 100}, {"P2", 120}}, false),
args: args{"P2", nil},
maxHeightWanted: int64(100),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pool := &blockPool{
logger: tt.fields.logger,
peers: tt.fields.peers,
blocks: tt.fields.blocks,
height: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
}
pool.removePeer(tt.args.peerID, tt.args.err)
assert.Equal(t, tt.maxHeightWanted, pool.maxPeerHeight)
_, ok := pool.peers[tt.args.peerID]
assert.False(t, ok)
})
}
}
func TestBlockPoolRemoveShortPeers(t *testing.T) {
l := log.TestingLogger()
tests := []struct {
name string
fields fields
maxHeightWanted int64
noChange bool
}{
{
name: "no short peers",
fields: makeUpdateFields(l, 100,
[]testPeer{{"P1", 100}, {"P2", 110}, {"P3", 120}},
false),
maxHeightWanted: int64(120),
noChange: true,
},
{
name: "one short peers",
fields: makeUpdateFields(l, 100,
[]testPeer{{"P1", 100}, {"P2", 90}, {"P3", 120}},
false),
maxHeightWanted: int64(120),
},
{
name: "all short peers",
fields: makeUpdateFields(l, 100,
[]testPeer{{"P1", 90}, {"P2", 91}, {"P3", 92}},
false),
maxHeightWanted: int64(0),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pool := &blockPool{
logger: tt.fields.logger,
peers: tt.fields.peers,
blocks: tt.fields.blocks,
height: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
}
beforePool := poolCopy(pool)
pool.removeShortPeers()
assert.Equal(t, tt.maxHeightWanted, pool.maxPeerHeight)
if tt.noChange {
assert.Equal(t, len(beforePool.peers), len(pool.peers))
return
}
for _, peer := range tt.fields.peers {
bPeer, bok := beforePool.peers[peer.id]
if bok && bPeer.height < beforePool.height {
_, ok := pool.peers[peer.id]
assert.False(t, ok)
}
}
})
}
}
func TestBlockPoolAddBlock(t *testing.T) {
type args struct {
peerID p2p.ID
block *types.Block
blockSize int
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pool := &blockPool{
logger: tt.fields.logger,
peers: tt.fields.peers,
blocks: tt.fields.blocks,
height: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
}
if err := pool.addBlock(tt.args.peerID, tt.args.block, tt.args.blockSize); (err != nil) != tt.wantErr {
t.Errorf("blockPool.addBlock() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestBlockPoolGetNextTwoBlocks(t *testing.T) {
tests := []struct {
name string
fields fields
wantFirst *blockData
wantSecond *blockData
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pool := &blockPool{
logger: tt.fields.logger,
peers: tt.fields.peers,
blocks: tt.fields.blocks,
height: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
}
gotFirst, gotSecond, err := pool.getNextTwoBlocks()
if (err != nil) != tt.wantErr {
t.Errorf("blockPool.getNextTwoBlocks() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotFirst, tt.wantFirst) {
t.Errorf("blockPool.getNextTwoBlocks() gotFirst = %v, want %v", gotFirst, tt.wantFirst)
}
if !reflect.DeepEqual(gotSecond, tt.wantSecond) {
t.Errorf("blockPool.getNextTwoBlocks() gotSecond = %v, want %v", gotSecond, tt.wantSecond)
}
})
}
}
func TestBlockPoolInvalidateFirstTwoBlocks(t *testing.T) {
type args struct {
err error
}
tests := []struct {
name string
fields fields
args args
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pool := &blockPool{
logger: tt.fields.logger,
peers: tt.fields.peers,
blocks: tt.fields.blocks,
height: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
}
pool.invalidateFirstTwoBlocks(tt.args.err)
})
}
}
func TestBlockPoolProcessedCurrentHeightBlock(t *testing.T) {
tests := []struct {
name string
fields fields
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pool := &blockPool{
logger: tt.fields.logger,
peers: tt.fields.peers,
blocks: tt.fields.blocks,
height: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
}
pool.processedCurrentHeightBlock()
})
}
}
func TestBlockPoolSendRequestBatch(t *testing.T) {
type args struct {
sendFunc func(peerID p2p.ID, height int64) error
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pool := &blockPool{
logger: tt.fields.logger,
peers: tt.fields.peers,
blocks: tt.fields.blocks,
height: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
}
if err := pool.sendRequestBatch(tt.args.sendFunc); (err != nil) != tt.wantErr {
t.Errorf("blockPool.sendRequestBatch() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func TestBlockPoolGetBestPeer(t *testing.T) {
type args struct {
height int64
}
tests := []struct {
name string
fields fields
args args
wantPeerId p2p.ID
wantErr bool
}{
// TODO: Add test cases.
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pool := &blockPool{
logger: tt.fields.logger,
peers: tt.fields.peers,
blocks: tt.fields.blocks,
height: tt.fields.height,
maxPeerHeight: tt.fields.maxPeerHeight,
}
gotPeerId, err := pool.getBestPeer(tt.args.height)
if (err != nil) != tt.wantErr {
t.Errorf("blockPool.getBestPeer() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(gotPeerId, tt.wantPeerId) {
t.Errorf("blockPool.getBestPeer() = %v, want %v", gotPeerId, tt.wantPeerId)
}
})
}
}

View File

@ -244,13 +244,20 @@ func init() {
// no blockResponse
// Should we send status request again? Switch to consensus?
// Note that any unresponsive peers have been already removed by their timer expiry handler.
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return finished, errNoPeerResponse
case statusResponseEv:
err := fsm.pool.updatePeer(data.peerId, data.height, fsm.processPeerError)
if len(fsm.pool.peers) == 0 {
_ = fsm.bcr.sendStatusRequest()
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForPeer, err
}
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForBlock, err
case blockResponseEv:
@ -265,6 +272,9 @@ func init() {
fsm.bcr.sendPeerError(err, data.peerId)
}
fsm.processSignalActive = false
if fsm.state.timer != nil {
fsm.state.timer.Stop()
}
return waitForBlock, err
}
@ -283,6 +293,7 @@ func init() {
// processed block, check if we are done
if fsm.pool.reachedMaxHeight() {
// TODO should we wait for more status responses in case a high peer is slow?
fsm.logger.Info("Switching to consensus!!!!")
fsm.bcr.switchToConsensus()
return finished, nil
}
@ -518,3 +529,18 @@ func (fsm *bReactorFSM) processBlock() {
func (fsm *bReactorFSM) IsFinished() bool {
return fsm.state == finished
}
// called from:
// - the switch from its go routing
// - when peer times out from the timer go routine.
// Send message to FSM
func (fsm *bReactorFSM) processPeerError(err error, peerID p2p.ID) {
msgData := bReactorMessageData{
event: peerErrEv,
data: bReactorEventData{
err: err,
peerId: peerID,
},
}
sendMessageToFSM(fsm, msgData)
}

View File

@ -126,7 +126,7 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals
func TestNoBlockResponse(t *testing.T) {
peerTimeout = 15 * time.Second
maxRequestBatchSize = 200
maxRequestBatchSize = 40
config = cfg.ResetTestRoot("blockchain_new_reactor_test")
defer os.RemoveAll(config.RootDir)
@ -198,12 +198,12 @@ func TestNoBlockResponse(t *testing.T) {
// that seems extreme.
func TestBadBlockStopsPeer(t *testing.T) {
peerTimeout = 15 * time.Second
maxRequestBatchSize = 32
maxRequestBatchSize = 40
config = cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(148)
maxBlockHeight := int64(500)
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
defer func() {
@ -323,9 +323,9 @@ func setupReactors(
func TestFastSyncMultiNode(t *testing.T) {
numNodes := 8
maxHeight := int64(2000)
maxHeight := int64(1000)
peerTimeout = 15 * time.Second
maxRequestBatchSize = 128
maxRequestBatchSize = 80
config = cfg.ResetTestRoot("blockchain_reactor_test")
genDoc, privVals := randGenesisDoc(1, false, 30)
@ -380,7 +380,7 @@ func TestFastSyncMultiNode(t *testing.T) {
fmt.Println(time.Since(start))
assert.True(t, lastReactorPair.reactor.Switch.Peers().Size() < len(reactorPairs))
assert.Equal(t, lastReactorPair.reactor.fsm.pool.maxPeerHeight, lastReactorPair.reactor.fsm.pool.height)
assert.Equal(t, lastReactorPair.reactor.fsm.pool.getMaxPeerHeight(), lastReactorPair.reactor.fsm.pool.height)
}
//----------------------------------------------