Added timeout on next block needed to advance

This commit is contained in:
Anca Zamfir
2019-04-28 22:03:26 -04:00
parent 6cf8035ed0
commit 8ef2f6b5f2
6 changed files with 464 additions and 297 deletions

View File

@ -34,7 +34,7 @@ type blockPool struct {
blocks map[int64]p2p.ID blocks map[int64]p2p.ID
requests map[int64]bool // list of blocks to be assigned peers for blockRequest requests map[int64]bool // list of blocks to be assigned peers for blockRequest
nextRequestHeight int64 // next request to be added to requests nextRequestHeight int64 // next height to be added to requests
height int64 // processing height height int64 // processing height
maxPeerHeight int64 // maximum height of all peers maxPeerHeight int64 // maximum height of all peers
@ -224,15 +224,14 @@ func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) {
for _, height := range heights { for _, height := range heights {
h := int64(height) h := int64(height)
if err := pool.sendRequest(h); err != nil { if !pool.sendRequest(h) {
// Errors from sendRequest() are handled by this function
return return
} }
delete(pool.requests, h) delete(pool.requests, h)
} }
} }
func (pool *blockPool) sendRequest(height int64) error { func (pool *blockPool) sendRequest(height int64) bool {
for _, peer := range pool.peers { for _, peer := range pool.peers {
if peer.numPending >= int32(maxRequestsPerPeer) { if peer.numPending >= int32(maxRequestsPerPeer) {
continue continue
@ -243,7 +242,6 @@ func (pool *blockPool) sendRequest(height int64) error {
pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height) pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height)
if err := pool.toBcR.sendBlockRequest(peer.id, height); err == errNilPeerForBlockRequest { if err := pool.toBcR.sendBlockRequest(peer.id, height); err == errNilPeerForBlockRequest {
pool.removePeer(peer.id, err) pool.removePeer(peer.id, err)
pool.toBcR.sendPeerError(err, peer.id)
} }
pool.blocks[height] = peer.id pool.blocks[height] = peer.id
@ -252,10 +250,10 @@ func (pool *blockPool) sendRequest(height int64) error {
peer.blocks[height] = nil peer.blocks[height] = nil
peer.incrPending() peer.incrPending()
return nil return true
} }
pool.logger.Error("could not find peer to send request for block at height", "height", height) pool.logger.Error("could not find peer to send request for block at height", "height", height)
return errNoPeerFoundForHeight return false
} }
// Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map. // Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map.
@ -289,12 +287,12 @@ func (pool *blockPool) getBlockAndPeerAtHeight(height int64) (bData *blockData,
peerID := pool.blocks[height] peerID := pool.blocks[height]
peer := pool.peers[peerID] peer := pool.peers[peerID]
if peer == nil { if peer == nil {
return &blockData{}, errMissingBlocks return nil, errMissingBlocks
} }
block, ok := peer.blocks[height] block, ok := peer.blocks[height]
if !ok || block == nil { if !ok || block == nil {
return &blockData{}, errMissingBlocks return nil, errMissingBlocks
} }
return &blockData{peer: peer, block: block}, nil return &blockData{peer: peer, block: block}, nil
@ -308,14 +306,12 @@ func (pool *blockPool) getNextTwoBlocks() (first, second *blockData, err error)
err = err2 err = err2
} }
if err == errMissingBlocks { pool.logger.Debug("blocks at height and/ or height+1", "first", first, "second", second)
// We need both to sync the first block.
pool.logger.Error("missing blocks at height and/ or height+1", "height", pool.height)
}
return return
} }
// Remove peers that sent us the first two blocks, blocks will also be removed by removePeer(). // Remove the peers that sent us the first two blocks, blocks are removed by removePeer().
func (pool *blockPool) invalidateFirstTwoBlocks(err error) { func (pool *blockPool) invalidateFirstTwoBlocks(err error) {
first, err1 := pool.getBlockAndPeerAtHeight(pool.height) first, err1 := pool.getBlockAndPeerAtHeight(pool.height)
second, err2 := pool.getBlockAndPeerAtHeight(pool.height + 1) second, err2 := pool.getBlockAndPeerAtHeight(pool.height + 1)
@ -339,6 +335,13 @@ func (pool *blockPool) processedCurrentHeightBlock() {
pool.removeShortPeers() pool.removeShortPeers()
} }
func (pool *blockPool) removePeerAtCurrentHeight(err error) {
first, err := pool.getBlockAndPeerAtHeight(pool.height)
if err == nil {
pool.removePeer(first.peer.id, err)
}
}
func (pool *blockPool) cleanup() { func (pool *blockPool) cleanup() {
for _, peer := range pool.peers { for _, peer := range pool.peers {
if peer.timeout != nil { if peer.timeout != nil {

View File

@ -1,7 +1,6 @@
package blockchain package blockchain
import ( import (
"reflect"
"testing" "testing"
"time" "time"
@ -17,13 +16,6 @@ type testPeer struct {
height int64 height int64
} }
type testPeerResult struct {
id p2p.ID
height int64
numPending int32
blocks map[int64]*types.Block
}
type testBcR struct { type testBcR struct {
logger log.Logger logger log.Logger
} }
@ -62,8 +54,11 @@ type tPBlocks struct {
create bool create bool
} }
// Makes a block pool with specified current height, list of peers, block requests and block responses
func makeBlockPool(bcr *testBcR, height int64, peers []bpPeer, blocks map[int64]tPBlocks) *blockPool { func makeBlockPool(bcr *testBcR, height int64, peers []bpPeer, blocks map[int64]tPBlocks) *blockPool {
bPool := newBlockPool(height, bcr) bPool := newBlockPool(height, bcr)
bPool.setLogger(bcr.logger)
txs := []types.Tx{types.Tx("foo"), types.Tx("bar")} txs := []types.Tx{types.Tx("foo"), types.Tx("bar")}
var maxH int64 var maxH int64
@ -80,122 +75,116 @@ func makeBlockPool(bcr *testBcR, height int64, peers []bpPeer, blocks map[int64]
bPool.blocks[h] = p.id bPool.blocks[h] = p.id
bPool.peers[p.id].blocks[h] = nil bPool.peers[p.id].blocks[h] = nil
if p.create { if p.create {
// simulate that a block at height h has been received
bPool.peers[p.id].blocks[h] = types.MakeBlock(int64(h), txs, nil, nil) bPool.peers[p.id].blocks[h] = types.MakeBlock(int64(h), txs, nil, nil)
} else { } else {
// simulate that a request for block at height h has been sent to peer p.id
bPool.peers[p.id].incrPending() bPool.peers[p.id].incrPending()
} }
} }
bPool.setLogger(bcr.logger)
return bPool return bPool
} }
func poolCopy(pool *blockPool) *blockPool { func assertPeerSetsEquivalent(t *testing.T, set1 map[p2p.ID]*bpPeer, set2 map[p2p.ID]*bpPeer) {
return &blockPool{ assert.Equal(t, len(set1), len(set2))
peers: peersCopy(pool.peers), for peerID, peer1 := range set1 {
logger: pool.logger, peer2 := set2[peerID]
blocks: pool.blocks, assert.NotNil(t, peer2)
requests: pool.requests, assert.Equal(t, peer1.numPending, peer2.numPending)
height: pool.height, assert.Equal(t, peer1.height, peer2.height)
nextRequestHeight: pool.height, assert.Equal(t, len(peer1.blocks), len(peer2.blocks))
maxPeerHeight: pool.maxPeerHeight, for h, block1 := range peer1.blocks {
toBcR: pool.toBcR, block2 := peer2.blocks[h]
// block1 and block2 could be nil if a request was made but no block was received
assert.Equal(t, block1, block2)
}
} }
} }
func peersCopy(peers map[p2p.ID]*bpPeer) map[p2p.ID]*bpPeer { func assertBlockPoolEquivalent(t *testing.T, poolWanted, pool *blockPool) {
peerCopy := make(map[p2p.ID]*bpPeer) assert.Equal(t, poolWanted.blocks, pool.blocks)
for _, p := range peers { assertPeerSetsEquivalent(t, poolWanted.peers, pool.peers)
peerCopy[p.id] = newBPPeer(p.id, p.height, p.errFunc) assert.Equal(t, poolWanted.maxPeerHeight, pool.maxPeerHeight)
}
return peerCopy
} }
func TestBlockPoolUpdatePeerNoBlocks(t *testing.T) { func TestBlockPoolUpdatePeer(t *testing.T) {
testBcR := newTestBcR() testBcR := newTestBcR()
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *blockPool
args testPeer args testPeer
poolWanted *blockPool
errWanted error errWanted error
addWanted bool
delWanted bool
maxHeightWanted int64
}{ }{
{ {
name: "add a first short peer", name: "add a first short peer",
pool: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}),
args: testPeer{"P1", 50}, args: testPeer{"P1", 50},
errWanted: errPeerTooShort, errWanted: errPeerTooShort,
maxHeightWanted: int64(0), poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}),
}, },
{ {
name: "add a first good peer", name: "add a first good peer",
pool: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}),
args: testPeer{"P1", 101}, args: testPeer{"P1", 101},
addWanted: true, poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 101}}, map[int64]tPBlocks{}),
maxHeightWanted: int64(101),
}, },
{ {
name: "increase the height of P1 from 120 to 123", name: "increase the height of P1 from 120 to 123",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}),
args: testPeer{"P1", 123}, args: testPeer{"P1", 123},
maxHeightWanted: int64(123), poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 123}}, map[int64]tPBlocks{}),
}, },
{ {
name: "decrease the height of P1 from 120 to 110", name: "decrease the height of P1 from 120 to 110 (still over current processing height)",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}),
args: testPeer{"P1", 110}, args: testPeer{"P1", 110},
maxHeightWanted: int64(110), poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 110}}, map[int64]tPBlocks{}),
}, },
{ {
name: "decrease the height of P1 from 120 to 90", name: "decrease the height of P1 from 120 to 90 (under current processing height)",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}),
args: testPeer{"P1", 90}, args: testPeer{"P1", 90},
delWanted: true,
errWanted: errPeerTooShort, errWanted: errPeerTooShort,
maxHeightWanted: int64(0), poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}),
},
{
name: "decrease the height of P1 from 105 to 102 with blocks",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 105}},
map[int64]tPBlocks{
100: {"P1", true}, 101: {"P1", true}, 102: {"P1", true},
103: {"P1", true}, 104: {"P1", false}}),
args: testPeer{"P1", 102},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 102}},
map[int64]tPBlocks{
100: {"P1", true}, 101: {"P1", true}, 102: {"P1", true}}),
},
{
name: "decrease the height of P1 from 102 to 99 with blocks",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 102}},
map[int64]tPBlocks{
100: {"P1", true}, 101: {"P1", true}, 102: {"P1", true}}),
args: testPeer{"P1", 99},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{},
map[int64]tPBlocks{}),
errWanted: errPeerTooShort,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
pool := tt.pool pool := tt.pool
beforePool := poolCopy(pool)
err := pool.updatePeer(tt.args.id, tt.args.height) err := pool.updatePeer(tt.args.id, tt.args.height)
if err != tt.errWanted { assert.Equal(t, tt.errWanted, err)
t.Errorf("blockPool.updatePeer() error = %v, wantErr %v", err, tt.errWanted) assert.Equal(t, tt.poolWanted.blocks, tt.pool.blocks)
} assertPeerSetsEquivalent(t, tt.poolWanted.peers, tt.pool.peers)
assert.Equal(t, tt.poolWanted.maxPeerHeight, tt.pool.maxPeerHeight)
if tt.errWanted != nil {
// error case
if tt.delWanted {
assert.Equal(t, len(beforePool.peers)-1, len(pool.peers))
return
}
assert.Equal(t, beforePool, pool)
return
}
if tt.addWanted {
// add case only
assert.Equal(t, len(beforePool.peers)+1, len(pool.peers))
} else {
// update case only
assert.Equal(t, len(beforePool.peers), len(pool.peers))
}
// both add and update
assert.Equal(t, pool.peers[tt.args.id].height, tt.args.height)
assert.Equal(t, tt.maxHeightWanted, pool.maxPeerHeight)
}) })
} }
} }
func TestBlockPoolRemovePeerNoBlocks(t *testing.T) { func TestBlockPoolRemovePeer(t *testing.T) {
testBcR := newTestBcR() testBcR := newTestBcR()
type args struct { type args struct {
@ -207,101 +196,112 @@ func TestBlockPoolRemovePeerNoBlocks(t *testing.T) {
name string name string
pool *blockPool pool *blockPool
args args args args
maxHeightWanted int64 poolWanted *blockPool
}{ }{
{ {
name: "attempt to delete non-existing peer", name: "attempt to delete non-existing peer",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}),
args: args{"P99", nil}, args: args{"P99", nil},
maxHeightWanted: int64(120), poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}),
}, },
{ {
name: "delete the only peer", name: "delete the only peer without blocks",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}),
args: args{"P1", nil}, args: args{"P1", nil},
maxHeightWanted: int64(0), poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}),
}, },
{ {
name: "delete the shortest of two peers", name: "delete the shortest of two peers without blocks",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 120}}, map[int64]tPBlocks{}),
args: args{"P1", nil}, args: args{"P1", nil},
maxHeightWanted: int64(120), poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P2", height: 120}}, map[int64]tPBlocks{}),
}, },
{ {
name: "delete the tallest of two peers", name: "delete the tallest of two peers without blocks",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 120}}, map[int64]tPBlocks{}),
args: args{"P2", nil}, args: args{"P2", nil},
maxHeightWanted: int64(100), poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 100}}, map[int64]tPBlocks{}),
},
{
name: "delete the only peer with block requests sent and blocks received",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}},
map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}),
args: args{"P1", nil},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}),
},
{
name: "delete the shortest of two peers with block requests sent and blocks received",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}, {id: "P2", height: 200}},
map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}),
args: args{"P1", nil},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P2", height: 200}}, map[int64]tPBlocks{}),
},
{
name: "delete the tallest of two peers with block requests sent and blocks received",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}, {id: "P2", height: 110}},
map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}),
args: args{"P1", nil},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P2", height: 110}}, map[int64]tPBlocks{}),
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
tt.pool.removePeer(tt.args.peerID, tt.args.err) tt.pool.removePeer(tt.args.peerID, tt.args.err)
assert.Equal(t, tt.maxHeightWanted, tt.pool.maxPeerHeight) assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool)
_, ok := tt.pool.peers[tt.args.peerID]
assert.False(t, ok)
}) })
} }
} }
func TestBlockPoolRemoveShortPeersNoBlocks(t *testing.T) { func TestBlockPoolRemoveShortPeers(t *testing.T) {
testBcR := newTestBcR() testBcR := newTestBcR()
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *blockPool
maxHeightWanted int64 poolWanted *blockPool
noChange bool
}{ }{
{ {
name: "no short peers", name: "no short peers",
pool: makeBlockPool(testBcR, 100, pool: makeBlockPool(testBcR, 100,
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 110}, {id: "P3", height: 120}}, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 110}, {id: "P3", height: 120}}, map[int64]tPBlocks{}),
map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100,
maxHeightWanted: int64(120), []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 110}, {id: "P3", height: 120}}, map[int64]tPBlocks{}),
noChange: true,
}, },
{ {
name: "one short peers", name: "one short peer",
pool: makeBlockPool(testBcR, 100, pool: makeBlockPool(testBcR, 100,
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 90}, {id: "P3", height: 120}}, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 90}, {id: "P3", height: 120}}, map[int64]tPBlocks{}),
map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100,
maxHeightWanted: int64(120), []bpPeer{{id: "P1", height: 100}, {id: "P3", height: 120}}, map[int64]tPBlocks{}),
}, },
{ {
name: "all short peers", name: "all short peers",
pool: makeBlockPool(testBcR, 100, pool: makeBlockPool(testBcR, 100,
[]bpPeer{{id: "P1", height: 90}, {id: "P2", height: 91}, {id: "P3", height: 92}}, []bpPeer{{id: "P1", height: 90}, {id: "P2", height: 91}, {id: "P3", height: 92}}, map[int64]tPBlocks{}),
map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}),
maxHeightWanted: int64(0),
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
pool := tt.pool pool := tt.pool
beforePool := poolCopy(pool)
pool.removeShortPeers() pool.removeShortPeers()
assert.Equal(t, tt.maxHeightWanted, pool.maxPeerHeight) assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool)
if tt.noChange {
assert.Equal(t, len(beforePool.peers), len(pool.peers))
return
}
for _, peer := range tt.pool.peers {
bPeer, bok := beforePool.peers[peer.id]
if bok && bPeer.height < beforePool.height {
_, ok := pool.peers[peer.id]
assert.False(t, ok)
}
}
}) })
} }
} }
func TestBlockPoolSendRequestBatch(t *testing.T) { func TestBlockPoolSendRequestBatch(t *testing.T) {
type testPeerResult struct {
id p2p.ID
numPending int32
}
testBcR := newTestBcR() testBcR := newTestBcR()
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *blockPool
@ -315,7 +315,7 @@ func TestBlockPoolSendRequestBatch(t *testing.T) {
pool: makeBlockPool(testBcR, 10, []bpPeer{{id: "P1", height: 100}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 10, []bpPeer{{id: "P1", height: 100}}, map[int64]tPBlocks{}),
maxRequestsPerPeer: 2, maxRequestsPerPeer: 2,
expRequests: map[int64]bool{10: true, 11: true}, expRequests: map[int64]bool{10: true, 11: true},
expPeerResults: []testPeerResult{{id: "P1", height: 100, numPending: 2, blocks: map[int64]*types.Block{10: nil, 11: nil}}}, expPeerResults: []testPeerResult{{id: "P1", numPending: 2}},
expNumPending: 2, expNumPending: 2,
}, },
{ {
@ -324,8 +324,8 @@ func TestBlockPoolSendRequestBatch(t *testing.T) {
maxRequestsPerPeer: 2, maxRequestsPerPeer: 2,
expRequests: map[int64]bool{10: true, 11: true}, expRequests: map[int64]bool{10: true, 11: true},
expPeerResults: []testPeerResult{ expPeerResults: []testPeerResult{
{id: "P1", height: 100, numPending: 2, blocks: map[int64]*types.Block{10: nil, 11: nil}}, {id: "P1", numPending: 2},
{id: "P2", height: 100, numPending: 2, blocks: map[int64]*types.Block{12: nil, 13: nil}}}, {id: "P2", numPending: 2}},
expNumPending: 4, expNumPending: 4,
}, },
} }
@ -364,7 +364,8 @@ func TestBlockPoolAddBlock(t *testing.T) {
name string name string
pool *blockPool pool *blockPool
args args args args
wantErr bool poolWanted *blockPool
errWanted error
}{ }{
{name: "block from unknown peer", {name: "block from unknown peer",
pool: makeBlockPool(testBcR, 10, []bpPeer{{id: "P1", height: 100}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 10, []bpPeer{{id: "P1", height: 100}}, map[int64]tPBlocks{}),
@ -373,7 +374,8 @@ func TestBlockPoolAddBlock(t *testing.T) {
block: types.MakeBlock(int64(10), txs, nil, nil), block: types.MakeBlock(int64(10), txs, nil, nil),
blockSize: 100, blockSize: 100,
}, },
wantErr: true, poolWanted: makeBlockPool(testBcR, 10, []bpPeer{{id: "P1", height: 100}}, map[int64]tPBlocks{}),
errWanted: errBadDataFromPeer,
}, },
{name: "unexpected block 11 from known peer - waiting for 10", {name: "unexpected block 11 from known peer - waiting for 10",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
@ -384,7 +386,10 @@ func TestBlockPoolAddBlock(t *testing.T) {
block: types.MakeBlock(int64(11), txs, nil, nil), block: types.MakeBlock(int64(11), txs, nil, nil),
blockSize: 100, blockSize: 100,
}, },
wantErr: true, poolWanted: makeBlockPool(testBcR, 10,
[]bpPeer{{id: "P1", height: 100}},
map[int64]tPBlocks{10: {"P1", false}}),
errWanted: errBadDataFromPeer,
}, },
{name: "unexpected block 10 from known peer - already have 10", {name: "unexpected block 10 from known peer - already have 10",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
@ -395,7 +400,24 @@ func TestBlockPoolAddBlock(t *testing.T) {
block: types.MakeBlock(int64(10), txs, nil, nil), block: types.MakeBlock(int64(10), txs, nil, nil),
blockSize: 100, blockSize: 100,
}, },
wantErr: true, poolWanted: makeBlockPool(testBcR, 10,
[]bpPeer{{id: "P1", height: 100}},
map[int64]tPBlocks{10: {"P1", true}}),
errWanted: errBadDataFromPeer,
},
{name: "unexpected block 10 from known peer P2 - expected 10 to come from P1",
pool: makeBlockPool(testBcR, 10,
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}},
map[int64]tPBlocks{10: {"P1", false}}),
args: args{
peerID: "P2",
block: types.MakeBlock(int64(10), txs, nil, nil),
blockSize: 100,
},
poolWanted: makeBlockPool(testBcR, 10,
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}},
map[int64]tPBlocks{10: {"P1", false}}),
errWanted: errBadDataFromPeer,
}, },
{name: "expected block from known peer", {name: "expected block from known peer",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
@ -406,16 +428,18 @@ func TestBlockPoolAddBlock(t *testing.T) {
block: types.MakeBlock(int64(10), txs, nil, nil), block: types.MakeBlock(int64(10), txs, nil, nil),
blockSize: 100, blockSize: 100,
}, },
wantErr: false, poolWanted: makeBlockPool(testBcR, 10,
[]bpPeer{{id: "P1", height: 100}},
map[int64]tPBlocks{10: {"P1", true}}),
errWanted: nil,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
pool := tt.pool err := tt.pool.addBlock(tt.args.peerID, tt.args.block, tt.args.blockSize)
if err := pool.addBlock(tt.args.peerID, tt.args.block, tt.args.blockSize); (err != nil) != tt.wantErr { assert.Equal(t, tt.errWanted, err)
t.Errorf("blockPool.addBlock() error = %v, wantErr %v", err, tt.wantErr) assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool)
}
}) })
} }
} }
@ -428,16 +452,14 @@ func TestBlockPoolGetNextTwoBlocks(t *testing.T) {
pool *blockPool pool *blockPool
firstWanted int64 firstWanted int64
secondWanted int64 secondWanted int64
wantErr bool errWanted error
}{ }{
{ {
name: "both blocks missing", name: "both blocks missing",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}},
map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}), map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}),
firstWanted: 0, errWanted: errMissingBlocks,
secondWanted: 0,
wantErr: true,
}, },
{ {
name: "second block missing", name: "second block missing",
@ -445,17 +467,15 @@ func TestBlockPoolGetNextTwoBlocks(t *testing.T) {
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}},
map[int64]tPBlocks{15: {"P1", true}, 18: {"P2", true}}), map[int64]tPBlocks{15: {"P1", true}, 18: {"P2", true}}),
firstWanted: 15, firstWanted: 15,
secondWanted: 0, errWanted: errMissingBlocks,
wantErr: true,
}, },
{ {
name: "first block missing", name: "first block missing",
pool: makeBlockPool(testBcR, 15, pool: makeBlockPool(testBcR, 15,
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}},
map[int64]tPBlocks{18: {"P1", true}, 16: {"P2", true}}), map[int64]tPBlocks{16: {"P2", true}, 18: {"P2", true}}),
firstWanted: 0,
secondWanted: 16, secondWanted: 16,
wantErr: true, errWanted: errMissingBlocks,
}, },
{ {
name: "both blocks present", name: "both blocks present",
@ -464,7 +484,6 @@ func TestBlockPoolGetNextTwoBlocks(t *testing.T) {
map[int64]tPBlocks{10: {"P1", true}, 11: {"P2", true}}), map[int64]tPBlocks{10: {"P1", true}, 11: {"P2", true}}),
firstWanted: 10, firstWanted: 10,
secondWanted: 11, secondWanted: 11,
wantErr: false,
}, },
} }
@ -472,22 +491,20 @@ func TestBlockPoolGetNextTwoBlocks(t *testing.T) {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
pool := tt.pool pool := tt.pool
gotFirst, gotSecond, err := pool.getNextTwoBlocks() gotFirst, gotSecond, err := pool.getNextTwoBlocks()
if (err != nil) != tt.wantErr { assert.Equal(t, tt.errWanted, err)
t.Errorf("blockPool.getNextTwoBlocks() error = %v, wantErr %v", err, tt.wantErr)
}
if tt.firstWanted != 0 { if tt.firstWanted != 0 {
peer := pool.blocks[tt.firstWanted] peer := pool.blocks[tt.firstWanted]
block := pool.peers[peer].blocks[tt.firstWanted] block := pool.peers[peer].blocks[tt.firstWanted]
if !reflect.DeepEqual(gotFirst.block, block) { assert.Equal(t, gotFirst.block, block,
t.Errorf("blockPool.getNextTwoBlocks() gotFirst = %v, want %v", gotFirst.block.Height, tt.firstWanted) "blockPool.getNextTwoBlocks() gotFirst = %v, want %v", gotFirst.block.Height, tt.firstWanted)
}
} }
if tt.secondWanted != 0 { if tt.secondWanted != 0 {
peer := pool.blocks[tt.secondWanted] peer := pool.blocks[tt.secondWanted]
block := pool.peers[peer].blocks[tt.secondWanted] block := pool.peers[peer].blocks[tt.secondWanted]
if !reflect.DeepEqual(gotSecond.block, block) { assert.Equal(t, gotSecond.block, block,
t.Errorf("blockPool.getNextTwoBlocks() gotFirst = %v, want %v", gotSecond.block.Height, tt.secondWanted) "blockPool.getNextTwoBlocks() gotFirst = %v, want %v", gotSecond.block.Height, tt.secondWanted)
}
} }
}) })
} }
@ -499,73 +516,128 @@ func TestBlockPoolInvalidateFirstTwoBlocks(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *blockPool
firstWanted int64 poolWanted *blockPool
secondWanted int64
wantChange bool
}{ }{
{ {
name: "both blocks missing", name: "both blocks missing",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}},
map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}), map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}),
firstWanted: 0, poolWanted: makeBlockPool(testBcR, 10,
secondWanted: 0, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}},
wantChange: false, map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}),
}, },
{ {
name: "second block missing", name: "second block missing",
pool: makeBlockPool(testBcR, 15, pool: makeBlockPool(testBcR, 15,
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}},
map[int64]tPBlocks{15: {"P1", true}, 18: {"P2", true}}), map[int64]tPBlocks{15: {"P1", true}, 18: {"P2", true}}),
firstWanted: 15, poolWanted: makeBlockPool(testBcR, 15,
secondWanted: 0, []bpPeer{{id: "P2", height: 100}},
wantChange: true, map[int64]tPBlocks{18: {"P2", true}}),
}, },
{ {
name: "first block missing", name: "first block missing",
pool: makeBlockPool(testBcR, 15, pool: makeBlockPool(testBcR, 15,
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}},
map[int64]tPBlocks{18: {"P1", true}, 16: {"P2", true}}), map[int64]tPBlocks{18: {"P1", true}, 16: {"P2", true}}),
firstWanted: 0, poolWanted: makeBlockPool(testBcR, 15,
secondWanted: 16, []bpPeer{{id: "P1", height: 100}},
wantChange: true, map[int64]tPBlocks{18: {"P1", true}}),
}, },
{ {
name: "both blocks present", name: "both blocks present",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
[]bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}},
map[int64]tPBlocks{10: {"P1", true}, 11: {"P2", true}}), map[int64]tPBlocks{10: {"P1", true}, 11: {"P2", true}}),
firstWanted: 10, poolWanted: makeBlockPool(testBcR, 10,
secondWanted: 11, []bpPeer{},
wantChange: true, map[int64]tPBlocks{}),
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
pool := tt.pool tt.pool.invalidateFirstTwoBlocks(errNoPeerResponse)
gotFirst, gotSecond, _ := pool.getNextTwoBlocks() assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool)
})
beforePool := poolCopy(pool) }
pool.invalidateFirstTwoBlocks(errNoPeerResponse) }
if !tt.wantChange {
assert.Equal(t, len(beforePool.peers), len(pool.peers)) func TestProcessedCurrentHeightBlock(t *testing.T) {
return testBcR := newTestBcR()
}
if tt.firstWanted != 0 { tests := []struct {
_, ok := pool.peers[gotFirst.peer.id] name string
assert.False(t, ok) pool *blockPool
_, ok = pool.blocks[tt.firstWanted] poolWanted *blockPool
assert.False(t, ok) }{
assert.True(t, pool.requests[tt.firstWanted]) {
} name: "one peer",
if tt.secondWanted != 0 { pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}},
_, ok := pool.peers[gotSecond.peer.id] map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", true}}),
assert.False(t, ok) poolWanted: makeBlockPool(testBcR, 101, []bpPeer{{id: "P1", height: 120}},
_, ok = pool.blocks[tt.secondWanted] map[int64]tPBlocks{101: {"P1", true}}),
assert.False(t, ok) },
assert.True(t, pool.requests[tt.secondWanted]) {
} name: "multiple peers",
pool: makeBlockPool(testBcR, 100,
[]bpPeer{{id: "P1", height: 120}, {id: "P2", height: 120}, {id: "P3", height: 130}},
map[int64]tPBlocks{
100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false},
101: {"P2", true}, 103: {"P2", false},
102: {"P3", true}, 106: {"P3", true}}),
poolWanted: makeBlockPool(testBcR, 101,
[]bpPeer{{id: "P1", height: 120}, {id: "P2", height: 120}, {id: "P3", height: 130}},
map[int64]tPBlocks{
104: {"P1", true}, 105: {"P1", false},
101: {"P2", true}, 103: {"P2", false},
102: {"P3", true}, 106: {"P3", true}}),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.pool.processedCurrentHeightBlock()
assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool)
})
}
}
func TestRemovePeerAtCurrentHeight(t *testing.T) {
testBcR := newTestBcR()
tests := []struct {
name string
pool *blockPool
poolWanted *blockPool
}{
{
name: "one peer",
pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}},
map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", true}}),
poolWanted: makeBlockPool(testBcR, 101, []bpPeer{}, map[int64]tPBlocks{}),
},
{
name: "multiple peers",
pool: makeBlockPool(testBcR, 100,
[]bpPeer{{id: "P1", height: 120}, {id: "P2", height: 120}, {id: "P3", height: 130}},
map[int64]tPBlocks{
100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false},
101: {"P2", true}, 103: {"P2", false},
102: {"P3", true}, 106: {"P3", true}}),
poolWanted: makeBlockPool(testBcR, 101,
[]bpPeer{{id: "P2", height: 120}, {id: "P3", height: 130}},
map[int64]tPBlocks{
101: {"P2", true}, 103: {"P2", false},
102: {"P3", true}, 106: {"P3", true}}),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.pool.removePeerAtCurrentHeight(errNoPeerResponse)
assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool)
}) })
} }
} }

View File

@ -194,24 +194,25 @@ func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcStatusRequestMessa
} }
func (bcR *BlockchainReactor) sendMessageToFSMAsync(msg bReactorMessageData) { func (bcR *BlockchainReactor) sendMessageToFSMAsync(msg bReactorMessageData) {
bcR.Logger.Error("send message to FSM for processing", "msg", msg.String()) bcR.Logger.Debug("send message to FSM for processing", "msg", msg.String())
bcR.messagesForFSMCh <- msg bcR.messagesForFSMCh <- msg
} }
func (bcR *BlockchainReactor) sendRemovePeerToFSM(peerID p2p.ID) { func (bcR *BlockchainReactor) sendErrorToFSMAsync(msg bReactorMessageData) {
msgData := bReactorMessageData{ bcR.Logger.Debug("send message to FSM for processing", "msg", msg.String())
event: peerRemoveEv, bcR.errorsForFSMCh <- msg
data: bReactorEventData{
peerId: peerID,
err: errSwitchRemovesPeer,
},
}
bcR.sendMessageToFSMAsync(msgData)
} }
// RemovePeer implements Reactor by removing peer from the pool. // RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
bcR.sendRemovePeerToFSM(peer.ID()) msgData := bReactorMessageData{
event: peerRemoveEv,
data: bReactorEventData{
peerId: peer.ID(),
err: errSwitchRemovesPeer,
},
}
bcR.sendErrorToFSMAsync(msgData)
} }
// Receive implements Reactor by handling 4 types of messages (look below). // Receive implements Reactor by handling 4 types of messages (look below).
@ -280,8 +281,8 @@ func (bcR *BlockchainReactor) poolRoutine() {
bcR.fsm.start() bcR.fsm.start()
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) processReceivedBlockTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
trySendTicker := time.NewTicker(trySendIntervalMS * time.Millisecond) sendBlockRequestTicker := time.NewTicker(trySendIntervalMS * time.Millisecond)
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second) switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
@ -289,20 +290,13 @@ func (bcR *BlockchainReactor) poolRoutine() {
lastHundred := time.Now() lastHundred := time.Now()
lastRate := 0.0 lastRate := 0.0
doProcessCh := make(chan struct{}, 1) doProcessBlockCh := make(chan struct{}, 1)
doSendCh := make(chan struct{}, 1)
ForLoop: ForLoop:
for { for {
select { select {
case <-trySendTicker.C: // chan time case <-sendBlockRequestTicker.C:
select {
case doSendCh <- struct{}{}:
default:
}
case <-doSendCh:
// Tell FSM to make more requests. // Tell FSM to make more requests.
// The maxNumPendingRequests may be changed based on low/ high watermark thresholds for // The maxNumPendingRequests may be changed based on low/ high watermark thresholds for
// - the number of blocks received and waiting to be processed, // - the number of blocks received and waiting to be processed,
@ -345,13 +339,13 @@ ForLoop:
break ForLoop break ForLoop
} }
case <-trySyncTicker.C: // chan time case <-processReceivedBlockTicker.C: // chan time
select { select {
case doProcessCh <- struct{}{}: case doProcessBlockCh <- struct{}{}:
default: default:
} }
case <-doProcessCh: case <-doProcessBlockCh:
err := bcR.processBlocksFromPoolRoutine() err := bcR.processBlocksFromPoolRoutine()
if err == errMissingBlocks { if err == errMissingBlocks {
continue ForLoop continue ForLoop
@ -369,7 +363,7 @@ ForLoop:
continue ForLoop continue ForLoop
} }
doProcessCh <- struct{}{} doProcessBlockCh <- struct{}{}
bcR.blocksSynced++ bcR.blocksSynced++
if bcR.blocksSynced%100 == 0 { if bcR.blocksSynced%100 == 0 {

View File

@ -142,6 +142,7 @@ var (
// state timers // state timers
var ( var (
waitForPeerTimeout = 2 * time.Second waitForPeerTimeout = 2 * time.Second
waitForBlockAtCurrentHeightTimeout = 5 * time.Second
) )
// errors // errors
@ -156,7 +157,6 @@ var (
errSendQueueFull = errors.New("block request not made, send-queue is full") errSendQueueFull = errors.New("block request not made, send-queue is full")
errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added") errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added")
errSlowPeer = errors.New("peer is not sending us data fast enough") errSlowPeer = errors.New("peer is not sending us data fast enough")
errNoPeerFoundForHeight = errors.New("could not find peer for block request")
errSwitchRemovesPeer = errors.New("switch is removing peer") errSwitchRemovesPeer = errors.New("switch is removing peer")
errTimeoutEventWrongState = errors.New("timeout event for a different state than the current one") errTimeoutEventWrongState = errors.New("timeout event for a different state than the current one")
) )
@ -220,6 +220,11 @@ func init() {
waitForBlock = &bReactorFSMState{ waitForBlock = &bReactorFSMState{
name: "waitForBlock", name: "waitForBlock",
timeout: waitForBlockAtCurrentHeightTimeout,
enter: func(fsm *bReactorFSM) {
// Stop when leaving the state.
fsm.resetStateTimer()
},
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) {
switch ev { switch ev {
@ -239,7 +244,6 @@ func init() {
fsm.pool.removePeer(data.peerId, err) fsm.pool.removePeer(data.peerId, err)
fsm.toBcR.sendPeerError(err, data.peerId) fsm.toBcR.sendPeerError(err, data.peerId)
} }
return waitForBlock, err return waitForBlock, err
case processedBlockEv: case processedBlockEv:
@ -259,6 +263,7 @@ func init() {
fsm.stop() fsm.stop()
return finished, nil return finished, nil
} }
fsm.resetStateTimer()
} }
return waitForBlock, data.err return waitForBlock, data.err
@ -272,7 +277,20 @@ func init() {
fsm.makeNextRequests(data.maxNumRequests) fsm.makeNextRequests(data.maxNumRequests)
return waitForBlock, nil return waitForBlock, nil
case stateTimeoutEv:
if data.stateName != "waitForBlock" {
fsm.logger.Error("received a state timeout event for different state", "state", data.stateName)
return waitForBlock, errTimeoutEventWrongState
}
// We haven't received the block at current height. Remove peer.
fsm.pool.removePeerAtCurrentHeight(errNoPeerResponse)
fsm.resetStateTimer()
return waitForBlock, errNoPeerResponse
case stopFSMEv: case stopFSMEv:
if fsm.stateTimer != nil {
fsm.stateTimer.Stop()
}
return finished, errNoErrorFinished return finished, errNoErrorFinished
default: default:

View File

@ -274,17 +274,13 @@ func TestFSMBasic(t *testing.T) {
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests), makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests),
// blockResponseEv for height 1 // blockResponseEv for height 1
makeStepBlockRespEv("waitForBlock", "waitForBlock", makeStepBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}),
"P1", 1, []int64{}),
// blockResponseEv for height 2 // blockResponseEv for height 2
makeStepBlockRespEv("waitForBlock", "waitForBlock", makeStepBlockRespEv("waitForBlock", "waitForBlock", "P1", 2, []int64{1}),
"P1", 2, []int64{1}),
// blockResponseEv for height 3 // blockResponseEv for height 3
makeStepBlockRespEv("waitForBlock", "waitForBlock", makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{1, 2}),
"P2", 3, []int64{1, 2}),
// blockResponseEv for height 4 // blockResponseEv for height 4
makeStepBlockRespEv("waitForBlock", "waitForBlock", makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 4, []int64{1, 2, 3}),
"P2", 4, []int64{1, 2, 3}),
// processedBlockEv // processedBlockEv
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil),
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil),
@ -379,11 +375,9 @@ func TestFSMBlockVerificationFailure(t *testing.T) {
makeStepBlockRespEv("waitForBlock", "waitForBlock", makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P2", 1, []int64{}), "P2", 1, []int64{}),
// blockResponseEv for height 2 // blockResponseEv for height 2
makeStepBlockRespEv("waitForBlock", "waitForBlock", makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{1}),
"P2", 2, []int64{1}),
// blockResponseEv for height 3 // blockResponseEv for height 3
makeStepBlockRespEv("waitForBlock", "waitForBlock", makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{1, 2}),
"P2", 3, []int64{1, 2}),
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil),
makeStepProcessedBlockEv("waitForBlock", "finished", nil), makeStepProcessedBlockEv("waitForBlock", "finished", nil),
@ -439,10 +433,8 @@ func TestFSMBlockVerificationFailure(t *testing.T) {
secondAfter, err2 := testBcR.fsm.pool.getBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1) secondAfter, err2 := testBcR.fsm.pool.getBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1)
assert.NotNil(t, err1) assert.NotNil(t, err1)
assert.NotNil(t, err2) assert.NotNil(t, err2)
assert.Nil(t, firstAfter.block) assert.Nil(t, firstAfter)
assert.Nil(t, firstAfter.peer) assert.Nil(t, secondAfter)
assert.Nil(t, secondAfter.block)
assert.Nil(t, secondAfter.peer)
} }
assert.Equal(t, step.expectedState, testBcR.fsm.state.name) assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
if step.expectedState == "finished" { if step.expectedState == "finished" {
@ -575,6 +567,89 @@ func TestFSMBadBlockFromPeer(t *testing.T) {
} }
} }
func TestFSMBlockAtCurrentHeightNeverArrives(t *testing.T) {
tests := []struct {
name string
startingHeight int64
maxRequestsPerPeer int32
steps []fsmStepTestValues
}{
{
name: "block at current height undelivered",
startingHeight: 1,
maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{
// startFSMEv
makeStepStartFSMEv(),
// statusResponseEv from P1
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
// make some requests
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests),
// block received for height 1
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 1, []int64{}),
// block recevied for height 2
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 2, []int64{1}),
// processed block at height 1
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil),
// add peer P2
makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil),
// timeout on block at height 3
makeStepStateTimeoutEv("waitForBlock", "waitForBlock", "waitForBlock", errNoPeerResponse),
// make some requests (includes redo-s for blocks 2 and 3)
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests),
// block received for height 2 from P2
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{}),
// block received for height 3 from P2
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{2}),
makeStepProcessedBlockEv("waitForBlock", "finished", nil),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create test reactor
testBcR := newTestReactor(tt.startingHeight)
resetTestValues()
if tt.maxRequestsPerPeer != 0 {
maxRequestsPerPeer = tt.maxRequestsPerPeer
}
for _, step := range tt.steps {
assert.Equal(t, step.currentState, testBcR.fsm.state.name)
oldNumStatusRequests := numStatusRequests
oldNumBlockRequests := numBlockRequests
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr)
if step.shouldSendStatusReq {
assert.Equal(t, oldNumStatusRequests+1, numStatusRequests)
} else {
assert.Equal(t, oldNumStatusRequests, numStatusRequests)
}
if step.blockReqIncreased {
assert.True(t, oldNumBlockRequests < numBlockRequests)
} else {
assert.Equal(t, oldNumBlockRequests, numBlockRequests)
}
for _, height := range step.blocksAdded {
_, err := testBcR.fsm.pool.getBlockAndPeerAtHeight(height)
assert.Nil(t, err)
}
assert.Equal(t, step.expectedState, testBcR.fsm.state.name)
}
})
}
}
func TestFSMPeerRelatedErrors(t *testing.T) { func TestFSMPeerRelatedErrors(t *testing.T) {
tests := []struct { tests := []struct {
name string name string

View File

@ -66,7 +66,7 @@ type BlockchainReactorPair struct {
conR *consensusReactorTest conR *consensusReactorTest
} }
func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair { func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) *BlockchainReactor {
if len(privVals) != 1 { if len(privVals) != 1 {
panic("only support one validator") panic("only support one validator")
} }
@ -122,10 +122,17 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain")) bcReactor.SetLogger(logger.With("module", "blockchain"))
return bcReactor
}
func newBlockchainReactorPair(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair {
consensusReactor := &consensusReactorTest{} consensusReactor := &consensusReactorTest{}
consensusReactor.BaseReactor = *p2p.NewBaseReactor("Consensus reactor", consensusReactor) consensusReactor.BaseReactor = *p2p.NewBaseReactor("Consensus reactor", consensusReactor)
return BlockchainReactorPair{bcReactor, consensusReactor} return BlockchainReactorPair{
newBlockchainReactor(logger, genDoc, privVals, maxBlockHeight),
consensusReactor}
} }
type consensusReactorTest struct { type consensusReactorTest struct {
@ -151,8 +158,8 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
reactorPairs := make([]BlockchainReactorPair, 2) reactorPairs := make([]BlockchainReactorPair, 2)
logger := log.TestingLogger() logger := log.TestingLogger()
reactorPairs[0] = newBlockchainReactor(logger, genDoc, privVals, maxBlockHeight) reactorPairs[0] = newBlockchainReactorPair(logger, genDoc, privVals, maxBlockHeight)
reactorPairs[1] = newBlockchainReactor(logger, genDoc, privVals, 0) reactorPairs[1] = newBlockchainReactorPair(logger, genDoc, privVals, 0)
p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR) s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR)
@ -209,7 +216,6 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
// Alternatively we could actually dial a TCP conn but // Alternatively we could actually dial a TCP conn but
// that seems extreme. // that seems extreme.
func TestFastSyncBadBlockStopsPeer(t *testing.T) { func TestFastSyncBadBlockStopsPeer(t *testing.T) {
var mtx sync.Mutex
numNodes := 4 numNodes := 4
maxBlockHeight := int64(148) maxBlockHeight := int64(148)
@ -217,7 +223,7 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) {
defer os.RemoveAll(config.RootDir) defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30) genDoc, privVals := randGenesisDoc(1, false, 30)
otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) otherChain := newBlockchainReactorPair(log.TestingLogger(), genDoc, privVals, maxBlockHeight)
defer func() { defer func() {
_ = otherChain.bcR.Stop() _ = otherChain.bcR.Stop()
_ = otherChain.conR.Stop() _ = otherChain.conR.Stop()
@ -232,10 +238,9 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) {
if i == 0 { if i == 0 {
height = maxBlockHeight height = maxBlockHeight
} }
reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height) reactorPairs[i] = newBlockchainReactorPair(logger[i], genDoc, privVals, height)
} }
mtx.Lock()
switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch) *p2p.Switch { switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch) *p2p.Switch {
reactorPairs[i].conR.mtx.Lock() reactorPairs[i].conR.mtx.Lock()
s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR) s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR)
@ -246,7 +251,6 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) {
return s return s
}, p2p.Connect2Switches) }, p2p.Connect2Switches)
mtx.Unlock()
defer func() { defer func() {
for _, r := range reactorPairs { for _, r := range reactorPairs {
@ -255,30 +259,30 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) {
} }
}() }()
outerFor:
for { for {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
reactorPairs[1].conR.mtx.Lock()
if reactorPairs[1].conR.switchedToConsensus {
reactorPairs[1].conR.mtx.Unlock()
break
}
reactorPairs[1].conR.mtx.Unlock()
if reactorPairs[numNodes-1].bcR.Switch.Peers().Size() == 0 { for i := 0; i < numNodes; i++ {
break reactorPairs[i].conR.mtx.Lock()
if !reactorPairs[i].conR.switchedToConsensus {
reactorPairs[i].conR.mtx.Unlock()
continue outerFor
} }
reactorPairs[i].conR.mtx.Unlock()
}
break outerFor
} }
//at this time, reactors[0-3] is the newest //at this time, reactors[0-3] is the newest
assert.Equal(t, numNodes-1, reactorPairs[1].bcR.Switch.Peers().Size()) assert.Equal(t, numNodes-1, reactorPairs[1].bcR.Switch.Peers().Size())
//mark last reactorPair as an invalid peer //mark last reactorPair as an invalid peer
mtx.Lock()
reactorPairs[numNodes-1].bcR.store = otherChain.bcR.store reactorPairs[numNodes-1].bcR.store = otherChain.bcR.store
mtx.Unlock()
lastLogger := log.TestingLogger() lastLogger := log.TestingLogger()
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0) lastReactorPair := newBlockchainReactorPair(lastLogger, genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair) reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
@ -310,6 +314,7 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) {
} }
assert.True(t, lastReactorPair.bcR.Switch.Peers().Size() < len(reactorPairs)-1) assert.True(t, lastReactorPair.bcR.Switch.Peers().Size() < len(reactorPairs)-1)
fmt.Println("TestFastSyncBadBlockStopsPeer finished")
} }
func setupReactors( func setupReactors(
@ -328,7 +333,7 @@ func setupReactors(
if i == 0 { if i == 0 {
height = maxBlockHeight height = maxBlockHeight
} }
reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height) reactorPairs[i] = newBlockchainReactorPair(logger[i], genDoc, privVals, height)
} }
switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch { switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch {
@ -385,7 +390,7 @@ outerFor:
assert.Equal(t, numNodes-1, reactorPairs[0].bcR.Switch.Peers().Size()) assert.Equal(t, numNodes-1, reactorPairs[0].bcR.Switch.Peers().Size())
lastLogger := log.TestingLogger() lastLogger := log.TestingLogger()
lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0) lastReactorPair := newBlockchainReactorPair(lastLogger, genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair) reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {