diff --git a/blockchain/pool.go b/blockchain/pool.go index 7a8411c9..38ade94f 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -34,7 +34,7 @@ type blockPool struct { blocks map[int64]p2p.ID requests map[int64]bool // list of blocks to be assigned peers for blockRequest - nextRequestHeight int64 // next request to be added to requests + nextRequestHeight int64 // next height to be added to requests height int64 // processing height maxPeerHeight int64 // maximum height of all peers @@ -224,15 +224,14 @@ func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) { for _, height := range heights { h := int64(height) - if err := pool.sendRequest(h); err != nil { - // Errors from sendRequest() are handled by this function + if !pool.sendRequest(h) { return } delete(pool.requests, h) } } -func (pool *blockPool) sendRequest(height int64) error { +func (pool *blockPool) sendRequest(height int64) bool { for _, peer := range pool.peers { if peer.numPending >= int32(maxRequestsPerPeer) { continue @@ -243,7 +242,6 @@ func (pool *blockPool) sendRequest(height int64) error { pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height) if err := pool.toBcR.sendBlockRequest(peer.id, height); err == errNilPeerForBlockRequest { pool.removePeer(peer.id, err) - pool.toBcR.sendPeerError(err, peer.id) } pool.blocks[height] = peer.id @@ -252,10 +250,10 @@ func (pool *blockPool) sendRequest(height int64) error { peer.blocks[height] = nil peer.incrPending() - return nil + return true } pool.logger.Error("could not find peer to send request for block at height", "height", height) - return errNoPeerFoundForHeight + return false } // Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map. @@ -289,12 +287,12 @@ func (pool *blockPool) getBlockAndPeerAtHeight(height int64) (bData *blockData, peerID := pool.blocks[height] peer := pool.peers[peerID] if peer == nil { - return &blockData{}, errMissingBlocks + return nil, errMissingBlocks } block, ok := peer.blocks[height] if !ok || block == nil { - return &blockData{}, errMissingBlocks + return nil, errMissingBlocks } return &blockData{peer: peer, block: block}, nil @@ -308,14 +306,12 @@ func (pool *blockPool) getNextTwoBlocks() (first, second *blockData, err error) err = err2 } - if err == errMissingBlocks { - // We need both to sync the first block. - pool.logger.Error("missing blocks at height and/ or height+1", "height", pool.height) - } + pool.logger.Debug("blocks at height and/ or height+1", "first", first, "second", second) + return } -// Remove peers that sent us the first two blocks, blocks will also be removed by removePeer(). +// Remove the peers that sent us the first two blocks, blocks are removed by removePeer(). func (pool *blockPool) invalidateFirstTwoBlocks(err error) { first, err1 := pool.getBlockAndPeerAtHeight(pool.height) second, err2 := pool.getBlockAndPeerAtHeight(pool.height + 1) @@ -339,6 +335,13 @@ func (pool *blockPool) processedCurrentHeightBlock() { pool.removeShortPeers() } +func (pool *blockPool) removePeerAtCurrentHeight(err error) { + first, err := pool.getBlockAndPeerAtHeight(pool.height) + if err == nil { + pool.removePeer(first.peer.id, err) + } +} + func (pool *blockPool) cleanup() { for _, peer := range pool.peers { if peer.timeout != nil { diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index 5c1f5bca..ae04fa3a 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -1,7 +1,6 @@ package blockchain import ( - "reflect" "testing" "time" @@ -17,13 +16,6 @@ type testPeer struct { height int64 } -type testPeerResult struct { - id p2p.ID - height int64 - numPending int32 - blocks map[int64]*types.Block -} - type testBcR struct { logger log.Logger } @@ -62,8 +54,11 @@ type tPBlocks struct { create bool } +// Makes a block pool with specified current height, list of peers, block requests and block responses func makeBlockPool(bcr *testBcR, height int64, peers []bpPeer, blocks map[int64]tPBlocks) *blockPool { bPool := newBlockPool(height, bcr) + bPool.setLogger(bcr.logger) + txs := []types.Tx{types.Tx("foo"), types.Tx("bar")} var maxH int64 @@ -80,122 +75,116 @@ func makeBlockPool(bcr *testBcR, height int64, peers []bpPeer, blocks map[int64] bPool.blocks[h] = p.id bPool.peers[p.id].blocks[h] = nil if p.create { + // simulate that a block at height h has been received bPool.peers[p.id].blocks[h] = types.MakeBlock(int64(h), txs, nil, nil) } else { + // simulate that a request for block at height h has been sent to peer p.id bPool.peers[p.id].incrPending() } } - bPool.setLogger(bcr.logger) return bPool } -func poolCopy(pool *blockPool) *blockPool { - return &blockPool{ - peers: peersCopy(pool.peers), - logger: pool.logger, - blocks: pool.blocks, - requests: pool.requests, - height: pool.height, - nextRequestHeight: pool.height, - maxPeerHeight: pool.maxPeerHeight, - toBcR: pool.toBcR, +func assertPeerSetsEquivalent(t *testing.T, set1 map[p2p.ID]*bpPeer, set2 map[p2p.ID]*bpPeer) { + assert.Equal(t, len(set1), len(set2)) + for peerID, peer1 := range set1 { + peer2 := set2[peerID] + assert.NotNil(t, peer2) + assert.Equal(t, peer1.numPending, peer2.numPending) + assert.Equal(t, peer1.height, peer2.height) + assert.Equal(t, len(peer1.blocks), len(peer2.blocks)) + for h, block1 := range peer1.blocks { + block2 := peer2.blocks[h] + // block1 and block2 could be nil if a request was made but no block was received + assert.Equal(t, block1, block2) + } } } -func peersCopy(peers map[p2p.ID]*bpPeer) map[p2p.ID]*bpPeer { - peerCopy := make(map[p2p.ID]*bpPeer) - for _, p := range peers { - peerCopy[p.id] = newBPPeer(p.id, p.height, p.errFunc) - } - return peerCopy +func assertBlockPoolEquivalent(t *testing.T, poolWanted, pool *blockPool) { + assert.Equal(t, poolWanted.blocks, pool.blocks) + assertPeerSetsEquivalent(t, poolWanted.peers, pool.peers) + assert.Equal(t, poolWanted.maxPeerHeight, pool.maxPeerHeight) } -func TestBlockPoolUpdatePeerNoBlocks(t *testing.T) { +func TestBlockPoolUpdatePeer(t *testing.T) { testBcR := newTestBcR() tests := []struct { - name string - pool *blockPool - args testPeer - errWanted error - addWanted bool - delWanted bool - maxHeightWanted int64 + name string + pool *blockPool + args testPeer + poolWanted *blockPool + errWanted error }{ - { - name: "add a first short peer", - pool: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), - args: testPeer{"P1", 50}, - errWanted: errPeerTooShort, - maxHeightWanted: int64(0), + name: "add a first short peer", + pool: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), + args: testPeer{"P1", 50}, + errWanted: errPeerTooShort, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), }, { - name: "add a first good peer", - pool: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), - args: testPeer{"P1", 101}, - addWanted: true, - maxHeightWanted: int64(101), + name: "add a first good peer", + pool: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), + args: testPeer{"P1", 101}, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 101}}, map[int64]tPBlocks{}), }, { - name: "increase the height of P1 from 120 to 123", - pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), - args: testPeer{"P1", 123}, - maxHeightWanted: int64(123), + name: "increase the height of P1 from 120 to 123", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), + args: testPeer{"P1", 123}, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 123}}, map[int64]tPBlocks{}), }, { - name: "decrease the height of P1 from 120 to 110", - pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), - args: testPeer{"P1", 110}, - maxHeightWanted: int64(110), + name: "decrease the height of P1 from 120 to 110 (still over current processing height)", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), + args: testPeer{"P1", 110}, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 110}}, map[int64]tPBlocks{}), }, { - name: "decrease the height of P1 from 120 to 90", - pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), - args: testPeer{"P1", 90}, - delWanted: true, - errWanted: errPeerTooShort, - maxHeightWanted: int64(0), + name: "decrease the height of P1 from 120 to 90 (under current processing height)", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), + args: testPeer{"P1", 90}, + errWanted: errPeerTooShort, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), + }, + { + name: "decrease the height of P1 from 105 to 102 with blocks", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 105}}, + map[int64]tPBlocks{ + 100: {"P1", true}, 101: {"P1", true}, 102: {"P1", true}, + 103: {"P1", true}, 104: {"P1", false}}), + args: testPeer{"P1", 102}, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 102}}, + map[int64]tPBlocks{ + 100: {"P1", true}, 101: {"P1", true}, 102: {"P1", true}}), + }, + { + name: "decrease the height of P1 from 102 to 99 with blocks", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 102}}, + map[int64]tPBlocks{ + 100: {"P1", true}, 101: {"P1", true}, 102: {"P1", true}}), + args: testPeer{"P1", 99}, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, + map[int64]tPBlocks{}), + errWanted: errPeerTooShort, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { pool := tt.pool - - beforePool := poolCopy(pool) err := pool.updatePeer(tt.args.id, tt.args.height) - if err != tt.errWanted { - t.Errorf("blockPool.updatePeer() error = %v, wantErr %v", err, tt.errWanted) - } - - if tt.errWanted != nil { - // error case - if tt.delWanted { - assert.Equal(t, len(beforePool.peers)-1, len(pool.peers)) - return - } - assert.Equal(t, beforePool, pool) - return - } - - if tt.addWanted { - // add case only - assert.Equal(t, len(beforePool.peers)+1, len(pool.peers)) - } else { - // update case only - assert.Equal(t, len(beforePool.peers), len(pool.peers)) - } - - // both add and update - assert.Equal(t, pool.peers[tt.args.id].height, tt.args.height) - assert.Equal(t, tt.maxHeightWanted, pool.maxPeerHeight) - + assert.Equal(t, tt.errWanted, err) + assert.Equal(t, tt.poolWanted.blocks, tt.pool.blocks) + assertPeerSetsEquivalent(t, tt.poolWanted.peers, tt.pool.peers) + assert.Equal(t, tt.poolWanted.maxPeerHeight, tt.pool.maxPeerHeight) }) } } -func TestBlockPoolRemovePeerNoBlocks(t *testing.T) { +func TestBlockPoolRemovePeer(t *testing.T) { testBcR := newTestBcR() type args struct { @@ -204,104 +193,115 @@ func TestBlockPoolRemovePeerNoBlocks(t *testing.T) { } tests := []struct { - name string - pool *blockPool - args args - maxHeightWanted int64 + name string + pool *blockPool + args args + poolWanted *blockPool }{ { - name: "attempt to delete non-existing peer", - pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), - args: args{"P99", nil}, - maxHeightWanted: int64(120), + name: "attempt to delete non-existing peer", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), + args: args{"P99", nil}, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), }, { - name: "delete the only peer", - pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), - args: args{"P1", nil}, - maxHeightWanted: int64(0), + name: "delete the only peer without blocks", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, map[int64]tPBlocks{}), + args: args{"P1", nil}, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), }, { - name: "delete the shortest of two peers", - pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 120}}, map[int64]tPBlocks{}), - args: args{"P1", nil}, - maxHeightWanted: int64(120), + name: "delete the shortest of two peers without blocks", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 120}}, map[int64]tPBlocks{}), + args: args{"P1", nil}, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P2", height: 120}}, map[int64]tPBlocks{}), }, { - name: "delete the tallest of two peers", - pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 120}}, map[int64]tPBlocks{}), - args: args{"P2", nil}, - maxHeightWanted: int64(100), + name: "delete the tallest of two peers without blocks", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 120}}, map[int64]tPBlocks{}), + args: args{"P2", nil}, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 100}}, map[int64]tPBlocks{}), + }, + { + name: "delete the only peer with block requests sent and blocks received", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, + map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}), + args: args{"P1", nil}, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), + }, + { + name: "delete the shortest of two peers with block requests sent and blocks received", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}, {id: "P2", height: 200}}, + map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}), + args: args{"P1", nil}, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P2", height: 200}}, map[int64]tPBlocks{}), + }, + { + name: "delete the tallest of two peers with block requests sent and blocks received", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}, {id: "P2", height: 110}}, + map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}), + args: args{"P1", nil}, + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{id: "P2", height: 110}}, map[int64]tPBlocks{}), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.pool.removePeer(tt.args.peerID, tt.args.err) - assert.Equal(t, tt.maxHeightWanted, tt.pool.maxPeerHeight) - _, ok := tt.pool.peers[tt.args.peerID] - assert.False(t, ok) + assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool) }) } } -func TestBlockPoolRemoveShortPeersNoBlocks(t *testing.T) { +func TestBlockPoolRemoveShortPeers(t *testing.T) { testBcR := newTestBcR() tests := []struct { - name string - pool *blockPool - maxHeightWanted int64 - noChange bool + name string + pool *blockPool + poolWanted *blockPool }{ { name: "no short peers", pool: makeBlockPool(testBcR, 100, - []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 110}, {id: "P3", height: 120}}, - map[int64]tPBlocks{}), - maxHeightWanted: int64(120), - noChange: true, + []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 110}, {id: "P3", height: 120}}, map[int64]tPBlocks{}), + poolWanted: makeBlockPool(testBcR, 100, + []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 110}, {id: "P3", height: 120}}, map[int64]tPBlocks{}), }, + { - name: "one short peers", + name: "one short peer", pool: makeBlockPool(testBcR, 100, - []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 90}, {id: "P3", height: 120}}, - map[int64]tPBlocks{}), - maxHeightWanted: int64(120), + []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 90}, {id: "P3", height: 120}}, map[int64]tPBlocks{}), + poolWanted: makeBlockPool(testBcR, 100, + []bpPeer{{id: "P1", height: 100}, {id: "P3", height: 120}}, map[int64]tPBlocks{}), }, + { name: "all short peers", pool: makeBlockPool(testBcR, 100, - []bpPeer{{id: "P1", height: 90}, {id: "P2", height: 91}, {id: "P3", height: 92}}, - map[int64]tPBlocks{}), - maxHeightWanted: int64(0), + []bpPeer{{id: "P1", height: 90}, {id: "P2", height: 91}, {id: "P3", height: 92}}, map[int64]tPBlocks{}), + poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { pool := tt.pool - beforePool := poolCopy(pool) - pool.removeShortPeers() - assert.Equal(t, tt.maxHeightWanted, pool.maxPeerHeight) - if tt.noChange { - assert.Equal(t, len(beforePool.peers), len(pool.peers)) - return - } - for _, peer := range tt.pool.peers { - bPeer, bok := beforePool.peers[peer.id] - if bok && bPeer.height < beforePool.height { - _, ok := pool.peers[peer.id] - assert.False(t, ok) - } - } + assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool) }) } } func TestBlockPoolSendRequestBatch(t *testing.T) { + type testPeerResult struct { + id p2p.ID + numPending int32 + } + testBcR := newTestBcR() + tests := []struct { name string pool *blockPool @@ -315,7 +315,7 @@ func TestBlockPoolSendRequestBatch(t *testing.T) { pool: makeBlockPool(testBcR, 10, []bpPeer{{id: "P1", height: 100}}, map[int64]tPBlocks{}), maxRequestsPerPeer: 2, expRequests: map[int64]bool{10: true, 11: true}, - expPeerResults: []testPeerResult{{id: "P1", height: 100, numPending: 2, blocks: map[int64]*types.Block{10: nil, 11: nil}}}, + expPeerResults: []testPeerResult{{id: "P1", numPending: 2}}, expNumPending: 2, }, { @@ -324,8 +324,8 @@ func TestBlockPoolSendRequestBatch(t *testing.T) { maxRequestsPerPeer: 2, expRequests: map[int64]bool{10: true, 11: true}, expPeerResults: []testPeerResult{ - {id: "P1", height: 100, numPending: 2, blocks: map[int64]*types.Block{10: nil, 11: nil}}, - {id: "P2", height: 100, numPending: 2, blocks: map[int64]*types.Block{12: nil, 13: nil}}}, + {id: "P1", numPending: 2}, + {id: "P2", numPending: 2}}, expNumPending: 4, }, } @@ -361,10 +361,11 @@ func TestBlockPoolAddBlock(t *testing.T) { blockSize int } tests := []struct { - name string - pool *blockPool - args args - wantErr bool + name string + pool *blockPool + args args + poolWanted *blockPool + errWanted error }{ {name: "block from unknown peer", pool: makeBlockPool(testBcR, 10, []bpPeer{{id: "P1", height: 100}}, map[int64]tPBlocks{}), @@ -373,7 +374,8 @@ func TestBlockPoolAddBlock(t *testing.T) { block: types.MakeBlock(int64(10), txs, nil, nil), blockSize: 100, }, - wantErr: true, + poolWanted: makeBlockPool(testBcR, 10, []bpPeer{{id: "P1", height: 100}}, map[int64]tPBlocks{}), + errWanted: errBadDataFromPeer, }, {name: "unexpected block 11 from known peer - waiting for 10", pool: makeBlockPool(testBcR, 10, @@ -384,7 +386,10 @@ func TestBlockPoolAddBlock(t *testing.T) { block: types.MakeBlock(int64(11), txs, nil, nil), blockSize: 100, }, - wantErr: true, + poolWanted: makeBlockPool(testBcR, 10, + []bpPeer{{id: "P1", height: 100}}, + map[int64]tPBlocks{10: {"P1", false}}), + errWanted: errBadDataFromPeer, }, {name: "unexpected block 10 from known peer - already have 10", pool: makeBlockPool(testBcR, 10, @@ -395,7 +400,24 @@ func TestBlockPoolAddBlock(t *testing.T) { block: types.MakeBlock(int64(10), txs, nil, nil), blockSize: 100, }, - wantErr: true, + poolWanted: makeBlockPool(testBcR, 10, + []bpPeer{{id: "P1", height: 100}}, + map[int64]tPBlocks{10: {"P1", true}}), + errWanted: errBadDataFromPeer, + }, + {name: "unexpected block 10 from known peer P2 - expected 10 to come from P1", + pool: makeBlockPool(testBcR, 10, + []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, + map[int64]tPBlocks{10: {"P1", false}}), + args: args{ + peerID: "P2", + block: types.MakeBlock(int64(10), txs, nil, nil), + blockSize: 100, + }, + poolWanted: makeBlockPool(testBcR, 10, + []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, + map[int64]tPBlocks{10: {"P1", false}}), + errWanted: errBadDataFromPeer, }, {name: "expected block from known peer", pool: makeBlockPool(testBcR, 10, @@ -406,16 +428,18 @@ func TestBlockPoolAddBlock(t *testing.T) { block: types.MakeBlock(int64(10), txs, nil, nil), blockSize: 100, }, - wantErr: false, + poolWanted: makeBlockPool(testBcR, 10, + []bpPeer{{id: "P1", height: 100}}, + map[int64]tPBlocks{10: {"P1", true}}), + errWanted: nil, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pool := tt.pool - if err := pool.addBlock(tt.args.peerID, tt.args.block, tt.args.blockSize); (err != nil) != tt.wantErr { - t.Errorf("blockPool.addBlock() error = %v, wantErr %v", err, tt.wantErr) - } + err := tt.pool.addBlock(tt.args.peerID, tt.args.block, tt.args.blockSize) + assert.Equal(t, tt.errWanted, err) + assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool) }) } } @@ -428,34 +452,30 @@ func TestBlockPoolGetNextTwoBlocks(t *testing.T) { pool *blockPool firstWanted int64 secondWanted int64 - wantErr bool + errWanted error }{ { name: "both blocks missing", pool: makeBlockPool(testBcR, 10, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}), - firstWanted: 0, - secondWanted: 0, - wantErr: true, + errWanted: errMissingBlocks, }, { name: "second block missing", pool: makeBlockPool(testBcR, 15, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, map[int64]tPBlocks{15: {"P1", true}, 18: {"P2", true}}), - firstWanted: 15, - secondWanted: 0, - wantErr: true, + firstWanted: 15, + errWanted: errMissingBlocks, }, { name: "first block missing", pool: makeBlockPool(testBcR, 15, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, - map[int64]tPBlocks{18: {"P1", true}, 16: {"P2", true}}), - firstWanted: 0, + map[int64]tPBlocks{16: {"P2", true}, 18: {"P2", true}}), secondWanted: 16, - wantErr: true, + errWanted: errMissingBlocks, }, { name: "both blocks present", @@ -464,7 +484,6 @@ func TestBlockPoolGetNextTwoBlocks(t *testing.T) { map[int64]tPBlocks{10: {"P1", true}, 11: {"P2", true}}), firstWanted: 10, secondWanted: 11, - wantErr: false, }, } @@ -472,22 +491,20 @@ func TestBlockPoolGetNextTwoBlocks(t *testing.T) { t.Run(tt.name, func(t *testing.T) { pool := tt.pool gotFirst, gotSecond, err := pool.getNextTwoBlocks() - if (err != nil) != tt.wantErr { - t.Errorf("blockPool.getNextTwoBlocks() error = %v, wantErr %v", err, tt.wantErr) - } + assert.Equal(t, tt.errWanted, err) + if tt.firstWanted != 0 { peer := pool.blocks[tt.firstWanted] block := pool.peers[peer].blocks[tt.firstWanted] - if !reflect.DeepEqual(gotFirst.block, block) { - t.Errorf("blockPool.getNextTwoBlocks() gotFirst = %v, want %v", gotFirst.block.Height, tt.firstWanted) - } + assert.Equal(t, gotFirst.block, block, + "blockPool.getNextTwoBlocks() gotFirst = %v, want %v", gotFirst.block.Height, tt.firstWanted) } + if tt.secondWanted != 0 { peer := pool.blocks[tt.secondWanted] block := pool.peers[peer].blocks[tt.secondWanted] - if !reflect.DeepEqual(gotSecond.block, block) { - t.Errorf("blockPool.getNextTwoBlocks() gotFirst = %v, want %v", gotSecond.block.Height, tt.secondWanted) - } + assert.Equal(t, gotSecond.block, block, + "blockPool.getNextTwoBlocks() gotFirst = %v, want %v", gotSecond.block.Height, tt.secondWanted) } }) } @@ -497,75 +514,130 @@ func TestBlockPoolInvalidateFirstTwoBlocks(t *testing.T) { testBcR := newTestBcR() tests := []struct { - name string - pool *blockPool - firstWanted int64 - secondWanted int64 - wantChange bool + name string + pool *blockPool + poolWanted *blockPool }{ { name: "both blocks missing", pool: makeBlockPool(testBcR, 10, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}), - firstWanted: 0, - secondWanted: 0, - wantChange: false, + poolWanted: makeBlockPool(testBcR, 10, + []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, + map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}), }, { name: "second block missing", pool: makeBlockPool(testBcR, 15, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, map[int64]tPBlocks{15: {"P1", true}, 18: {"P2", true}}), - firstWanted: 15, - secondWanted: 0, - wantChange: true, + poolWanted: makeBlockPool(testBcR, 15, + []bpPeer{{id: "P2", height: 100}}, + map[int64]tPBlocks{18: {"P2", true}}), }, { name: "first block missing", pool: makeBlockPool(testBcR, 15, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, map[int64]tPBlocks{18: {"P1", true}, 16: {"P2", true}}), - firstWanted: 0, - secondWanted: 16, - wantChange: true, + poolWanted: makeBlockPool(testBcR, 15, + []bpPeer{{id: "P1", height: 100}}, + map[int64]tPBlocks{18: {"P1", true}}), }, { name: "both blocks present", pool: makeBlockPool(testBcR, 10, []bpPeer{{id: "P1", height: 100}, {id: "P2", height: 100}}, map[int64]tPBlocks{10: {"P1", true}, 11: {"P2", true}}), - firstWanted: 10, - secondWanted: 11, - wantChange: true, + poolWanted: makeBlockPool(testBcR, 10, + []bpPeer{}, + map[int64]tPBlocks{}), }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pool := tt.pool - gotFirst, gotSecond, _ := pool.getNextTwoBlocks() - - beforePool := poolCopy(pool) - pool.invalidateFirstTwoBlocks(errNoPeerResponse) - if !tt.wantChange { - assert.Equal(t, len(beforePool.peers), len(pool.peers)) - return - } - if tt.firstWanted != 0 { - _, ok := pool.peers[gotFirst.peer.id] - assert.False(t, ok) - _, ok = pool.blocks[tt.firstWanted] - assert.False(t, ok) - assert.True(t, pool.requests[tt.firstWanted]) - } - if tt.secondWanted != 0 { - _, ok := pool.peers[gotSecond.peer.id] - assert.False(t, ok) - _, ok = pool.blocks[tt.secondWanted] - assert.False(t, ok) - assert.True(t, pool.requests[tt.secondWanted]) - } + tt.pool.invalidateFirstTwoBlocks(errNoPeerResponse) + assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool) + }) + } +} + +func TestProcessedCurrentHeightBlock(t *testing.T) { + testBcR := newTestBcR() + + tests := []struct { + name string + pool *blockPool + poolWanted *blockPool + }{ + { + name: "one peer", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, + map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", true}}), + poolWanted: makeBlockPool(testBcR, 101, []bpPeer{{id: "P1", height: 120}}, + map[int64]tPBlocks{101: {"P1", true}}), + }, + { + name: "multiple peers", + pool: makeBlockPool(testBcR, 100, + []bpPeer{{id: "P1", height: 120}, {id: "P2", height: 120}, {id: "P3", height: 130}}, + map[int64]tPBlocks{ + 100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false}, + 101: {"P2", true}, 103: {"P2", false}, + 102: {"P3", true}, 106: {"P3", true}}), + poolWanted: makeBlockPool(testBcR, 101, + []bpPeer{{id: "P1", height: 120}, {id: "P2", height: 120}, {id: "P3", height: 130}}, + map[int64]tPBlocks{ + 104: {"P1", true}, 105: {"P1", false}, + 101: {"P2", true}, 103: {"P2", false}, + 102: {"P3", true}, 106: {"P3", true}}), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.pool.processedCurrentHeightBlock() + assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool) + }) + } +} + +func TestRemovePeerAtCurrentHeight(t *testing.T) { + testBcR := newTestBcR() + + tests := []struct { + name string + pool *blockPool + poolWanted *blockPool + }{ + { + name: "one peer", + pool: makeBlockPool(testBcR, 100, []bpPeer{{id: "P1", height: 120}}, + map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", true}}), + poolWanted: makeBlockPool(testBcR, 101, []bpPeer{}, map[int64]tPBlocks{}), + }, + { + name: "multiple peers", + pool: makeBlockPool(testBcR, 100, + []bpPeer{{id: "P1", height: 120}, {id: "P2", height: 120}, {id: "P3", height: 130}}, + map[int64]tPBlocks{ + 100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false}, + 101: {"P2", true}, 103: {"P2", false}, + 102: {"P3", true}, 106: {"P3", true}}), + poolWanted: makeBlockPool(testBcR, 101, + []bpPeer{{id: "P2", height: 120}, {id: "P3", height: 130}}, + map[int64]tPBlocks{ + 101: {"P2", true}, 103: {"P2", false}, + 102: {"P3", true}, 106: {"P3", true}}), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.pool.removePeerAtCurrentHeight(errNoPeerResponse) + assertBlockPoolEquivalent(t, tt.poolWanted, tt.pool) }) } } diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 90ad5ff2..3c2b1b2e 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -194,24 +194,25 @@ func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcStatusRequestMessa } func (bcR *BlockchainReactor) sendMessageToFSMAsync(msg bReactorMessageData) { - bcR.Logger.Error("send message to FSM for processing", "msg", msg.String()) + bcR.Logger.Debug("send message to FSM for processing", "msg", msg.String()) bcR.messagesForFSMCh <- msg } -func (bcR *BlockchainReactor) sendRemovePeerToFSM(peerID p2p.ID) { - msgData := bReactorMessageData{ - event: peerRemoveEv, - data: bReactorEventData{ - peerId: peerID, - err: errSwitchRemovesPeer, - }, - } - bcR.sendMessageToFSMAsync(msgData) +func (bcR *BlockchainReactor) sendErrorToFSMAsync(msg bReactorMessageData) { + bcR.Logger.Debug("send message to FSM for processing", "msg", msg.String()) + bcR.errorsForFSMCh <- msg } // RemovePeer implements Reactor by removing peer from the pool. func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { - bcR.sendRemovePeerToFSM(peer.ID()) + msgData := bReactorMessageData{ + event: peerRemoveEv, + data: bReactorEventData{ + peerId: peer.ID(), + err: errSwitchRemovesPeer, + }, + } + bcR.sendErrorToFSMAsync(msgData) } // Receive implements Reactor by handling 4 types of messages (look below). @@ -280,8 +281,8 @@ func (bcR *BlockchainReactor) poolRoutine() { bcR.fsm.start() - trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) - trySendTicker := time.NewTicker(trySendIntervalMS * time.Millisecond) + processReceivedBlockTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) + sendBlockRequestTicker := time.NewTicker(trySendIntervalMS * time.Millisecond) statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second) @@ -289,20 +290,13 @@ func (bcR *BlockchainReactor) poolRoutine() { lastHundred := time.Now() lastRate := 0.0 - doProcessCh := make(chan struct{}, 1) - doSendCh := make(chan struct{}, 1) + doProcessBlockCh := make(chan struct{}, 1) ForLoop: for { select { - case <-trySendTicker.C: // chan time - select { - case doSendCh <- struct{}{}: - default: - } - - case <-doSendCh: + case <-sendBlockRequestTicker.C: // Tell FSM to make more requests. // The maxNumPendingRequests may be changed based on low/ high watermark thresholds for // - the number of blocks received and waiting to be processed, @@ -345,13 +339,13 @@ ForLoop: break ForLoop } - case <-trySyncTicker.C: // chan time + case <-processReceivedBlockTicker.C: // chan time select { - case doProcessCh <- struct{}{}: + case doProcessBlockCh <- struct{}{}: default: } - case <-doProcessCh: + case <-doProcessBlockCh: err := bcR.processBlocksFromPoolRoutine() if err == errMissingBlocks { continue ForLoop @@ -369,7 +363,7 @@ ForLoop: continue ForLoop } - doProcessCh <- struct{}{} + doProcessBlockCh <- struct{}{} bcR.blocksSynced++ if bcR.blocksSynced%100 == 0 { diff --git a/blockchain/reactor_fsm.go b/blockchain/reactor_fsm.go index 5739bb2f..a713aca3 100644 --- a/blockchain/reactor_fsm.go +++ b/blockchain/reactor_fsm.go @@ -141,7 +141,8 @@ var ( // state timers var ( - waitForPeerTimeout = 2 * time.Second + waitForPeerTimeout = 2 * time.Second + waitForBlockAtCurrentHeightTimeout = 5 * time.Second ) // errors @@ -156,7 +157,6 @@ var ( errSendQueueFull = errors.New("block request not made, send-queue is full") errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added") errSlowPeer = errors.New("peer is not sending us data fast enough") - errNoPeerFoundForHeight = errors.New("could not find peer for block request") errSwitchRemovesPeer = errors.New("switch is removing peer") errTimeoutEventWrongState = errors.New("timeout event for a different state than the current one") ) @@ -219,7 +219,12 @@ func init() { } waitForBlock = &bReactorFSMState{ - name: "waitForBlock", + name: "waitForBlock", + timeout: waitForBlockAtCurrentHeightTimeout, + enter: func(fsm *bReactorFSM) { + // Stop when leaving the state. + fsm.resetStateTimer() + }, handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { switch ev { @@ -239,7 +244,6 @@ func init() { fsm.pool.removePeer(data.peerId, err) fsm.toBcR.sendPeerError(err, data.peerId) } - return waitForBlock, err case processedBlockEv: @@ -259,6 +263,7 @@ func init() { fsm.stop() return finished, nil } + fsm.resetStateTimer() } return waitForBlock, data.err @@ -272,7 +277,20 @@ func init() { fsm.makeNextRequests(data.maxNumRequests) return waitForBlock, nil + case stateTimeoutEv: + if data.stateName != "waitForBlock" { + fsm.logger.Error("received a state timeout event for different state", "state", data.stateName) + return waitForBlock, errTimeoutEventWrongState + } + // We haven't received the block at current height. Remove peer. + fsm.pool.removePeerAtCurrentHeight(errNoPeerResponse) + fsm.resetStateTimer() + return waitForBlock, errNoPeerResponse + case stopFSMEv: + if fsm.stateTimer != nil { + fsm.stateTimer.Stop() + } return finished, errNoErrorFinished default: diff --git a/blockchain/reactor_fsm_test.go b/blockchain/reactor_fsm_test.go index 9a3e318a..a4849fca 100644 --- a/blockchain/reactor_fsm_test.go +++ b/blockchain/reactor_fsm_test.go @@ -274,17 +274,13 @@ func TestFSMBasic(t *testing.T) { makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests), // blockResponseEv for height 1 - makeStepBlockRespEv("waitForBlock", "waitForBlock", - "P1", 1, []int64{}), + makeStepBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}), // blockResponseEv for height 2 - makeStepBlockRespEv("waitForBlock", "waitForBlock", - "P1", 2, []int64{1}), + makeStepBlockRespEv("waitForBlock", "waitForBlock", "P1", 2, []int64{1}), // blockResponseEv for height 3 - makeStepBlockRespEv("waitForBlock", "waitForBlock", - "P2", 3, []int64{1, 2}), + makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{1, 2}), // blockResponseEv for height 4 - makeStepBlockRespEv("waitForBlock", "waitForBlock", - "P2", 4, []int64{1, 2, 3}), + makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 4, []int64{1, 2, 3}), // processedBlockEv makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), @@ -379,11 +375,9 @@ func TestFSMBlockVerificationFailure(t *testing.T) { makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 1, []int64{}), // blockResponseEv for height 2 - makeStepBlockRespEv("waitForBlock", "waitForBlock", - "P2", 2, []int64{1}), + makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{1}), // blockResponseEv for height 3 - makeStepBlockRespEv("waitForBlock", "waitForBlock", - "P2", 3, []int64{1, 2}), + makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{1, 2}), makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), makeStepProcessedBlockEv("waitForBlock", "finished", nil), @@ -439,10 +433,8 @@ func TestFSMBlockVerificationFailure(t *testing.T) { secondAfter, err2 := testBcR.fsm.pool.getBlockAndPeerAtHeight(testBcR.fsm.pool.height + 1) assert.NotNil(t, err1) assert.NotNil(t, err2) - assert.Nil(t, firstAfter.block) - assert.Nil(t, firstAfter.peer) - assert.Nil(t, secondAfter.block) - assert.Nil(t, secondAfter.peer) + assert.Nil(t, firstAfter) + assert.Nil(t, secondAfter) } assert.Equal(t, step.expectedState, testBcR.fsm.state.name) if step.expectedState == "finished" { @@ -575,6 +567,89 @@ func TestFSMBadBlockFromPeer(t *testing.T) { } } +func TestFSMBlockAtCurrentHeightNeverArrives(t *testing.T) { + tests := []struct { + name string + startingHeight int64 + maxRequestsPerPeer int32 + steps []fsmStepTestValues + }{ + { + name: "block at current height undelivered", + startingHeight: 1, + maxRequestsPerPeer: 3, + steps: []fsmStepTestValues{ + // startFSMEv + makeStepStartFSMEv(), + // statusResponseEv from P1 + makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), + // make some requests + makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests), + // block received for height 1 + makeStepBlockRespEv("waitForBlock", "waitForBlock", + "P1", 1, []int64{}), + // block recevied for height 2 + makeStepBlockRespEv("waitForBlock", "waitForBlock", + "P1", 2, []int64{1}), + // processed block at height 1 + makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), + + // add peer P2 + makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil), + // timeout on block at height 3 + makeStepStateTimeoutEv("waitForBlock", "waitForBlock", "waitForBlock", errNoPeerResponse), + + // make some requests (includes redo-s for blocks 2 and 3) + makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests), + // block received for height 2 from P2 + makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{}), + // block received for height 3 from P2 + makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{2}), + makeStepProcessedBlockEv("waitForBlock", "finished", nil), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create test reactor + testBcR := newTestReactor(tt.startingHeight) + resetTestValues() + + if tt.maxRequestsPerPeer != 0 { + maxRequestsPerPeer = tt.maxRequestsPerPeer + } + + for _, step := range tt.steps { + assert.Equal(t, step.currentState, testBcR.fsm.state.name) + + oldNumStatusRequests := numStatusRequests + oldNumBlockRequests := numBlockRequests + + fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) + assert.Equal(t, step.errWanted, fsmErr) + + if step.shouldSendStatusReq { + assert.Equal(t, oldNumStatusRequests+1, numStatusRequests) + } else { + assert.Equal(t, oldNumStatusRequests, numStatusRequests) + } + + if step.blockReqIncreased { + assert.True(t, oldNumBlockRequests < numBlockRequests) + } else { + assert.Equal(t, oldNumBlockRequests, numBlockRequests) + } + + for _, height := range step.blocksAdded { + _, err := testBcR.fsm.pool.getBlockAndPeerAtHeight(height) + assert.Nil(t, err) + } + assert.Equal(t, step.expectedState, testBcR.fsm.state.name) + } + }) + } +} + func TestFSMPeerRelatedErrors(t *testing.T) { tests := []struct { name string diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 25e7c072..238ceefd 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -66,7 +66,7 @@ type BlockchainReactorPair struct { conR *consensusReactorTest } -func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair { +func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) *BlockchainReactor { if len(privVals) != 1 { panic("only support one validator") } @@ -122,10 +122,17 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor.SetLogger(logger.With("module", "blockchain")) + return bcReactor +} + +func newBlockchainReactorPair(logger log.Logger, genDoc *types.GenesisDoc, privVals []types.PrivValidator, maxBlockHeight int64) BlockchainReactorPair { + consensusReactor := &consensusReactorTest{} consensusReactor.BaseReactor = *p2p.NewBaseReactor("Consensus reactor", consensusReactor) - return BlockchainReactorPair{bcReactor, consensusReactor} + return BlockchainReactorPair{ + newBlockchainReactor(logger, genDoc, privVals, maxBlockHeight), + consensusReactor} } type consensusReactorTest struct { @@ -151,8 +158,8 @@ func TestFastSyncNoBlockResponse(t *testing.T) { reactorPairs := make([]BlockchainReactorPair, 2) logger := log.TestingLogger() - reactorPairs[0] = newBlockchainReactor(logger, genDoc, privVals, maxBlockHeight) - reactorPairs[1] = newBlockchainReactor(logger, genDoc, privVals, 0) + reactorPairs[0] = newBlockchainReactorPair(logger, genDoc, privVals, maxBlockHeight) + reactorPairs[1] = newBlockchainReactorPair(logger, genDoc, privVals, 0) p2p.MakeConnectedSwitches(config.P2P, 2, func(i int, s *p2p.Switch) *p2p.Switch { s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR) @@ -209,7 +216,6 @@ func TestFastSyncNoBlockResponse(t *testing.T) { // Alternatively we could actually dial a TCP conn but // that seems extreme. func TestFastSyncBadBlockStopsPeer(t *testing.T) { - var mtx sync.Mutex numNodes := 4 maxBlockHeight := int64(148) @@ -217,7 +223,7 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) { defer os.RemoveAll(config.RootDir) genDoc, privVals := randGenesisDoc(1, false, 30) - otherChain := newBlockchainReactor(log.TestingLogger(), genDoc, privVals, maxBlockHeight) + otherChain := newBlockchainReactorPair(log.TestingLogger(), genDoc, privVals, maxBlockHeight) defer func() { _ = otherChain.bcR.Stop() _ = otherChain.conR.Stop() @@ -232,10 +238,9 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) { if i == 0 { height = maxBlockHeight } - reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height) + reactorPairs[i] = newBlockchainReactorPair(logger[i], genDoc, privVals, height) } - mtx.Lock() switches := p2p.MakeConnectedSwitches(config.P2P, numNodes, func(i int, s *p2p.Switch) *p2p.Switch { reactorPairs[i].conR.mtx.Lock() s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR) @@ -246,7 +251,6 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) { return s }, p2p.Connect2Switches) - mtx.Unlock() defer func() { for _, r := range reactorPairs { @@ -255,30 +259,30 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) { } }() +outerFor: for { time.Sleep(1 * time.Second) - reactorPairs[1].conR.mtx.Lock() - if reactorPairs[1].conR.switchedToConsensus { - reactorPairs[1].conR.mtx.Unlock() - break - } - reactorPairs[1].conR.mtx.Unlock() - if reactorPairs[numNodes-1].bcR.Switch.Peers().Size() == 0 { - break + for i := 0; i < numNodes; i++ { + reactorPairs[i].conR.mtx.Lock() + if !reactorPairs[i].conR.switchedToConsensus { + reactorPairs[i].conR.mtx.Unlock() + continue outerFor + } + reactorPairs[i].conR.mtx.Unlock() } + + break outerFor } //at this time, reactors[0-3] is the newest assert.Equal(t, numNodes-1, reactorPairs[1].bcR.Switch.Peers().Size()) //mark last reactorPair as an invalid peer - mtx.Lock() reactorPairs[numNodes-1].bcR.store = otherChain.bcR.store - mtx.Unlock() lastLogger := log.TestingLogger() - lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0) + lastReactorPair := newBlockchainReactorPair(lastLogger, genDoc, privVals, 0) reactorPairs = append(reactorPairs, lastReactorPair) switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch { @@ -310,6 +314,7 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) { } assert.True(t, lastReactorPair.bcR.Switch.Peers().Size() < len(reactorPairs)-1) + fmt.Println("TestFastSyncBadBlockStopsPeer finished") } func setupReactors( @@ -328,7 +333,7 @@ func setupReactors( if i == 0 { height = maxBlockHeight } - reactorPairs[i] = newBlockchainReactor(logger[i], genDoc, privVals, height) + reactorPairs[i] = newBlockchainReactorPair(logger[i], genDoc, privVals, height) } switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch { @@ -385,7 +390,7 @@ outerFor: assert.Equal(t, numNodes-1, reactorPairs[0].bcR.Switch.Peers().Size()) lastLogger := log.TestingLogger() - lastReactorPair := newBlockchainReactor(lastLogger, genDoc, privVals, 0) + lastReactorPair := newBlockchainReactorPair(lastLogger, genDoc, privVals, 0) reactorPairs = append(reactorPairs, lastReactorPair) switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {