373 lines
11 KiB
Go
Raw Normal View History

package blockchain
2015-03-22 03:30:22 -07:00
import (
"fmt"
2019-04-13 09:23:43 -04:00
"sort"
2015-03-22 03:30:22 -07:00
2018-07-01 22:36:49 -04:00
"github.com/tendermint/tendermint/libs/log"
2018-01-03 11:29:19 +01:00
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
2015-03-22 03:30:22 -07:00
)
2019-04-13 09:23:43 -04:00
type blockData struct {
block *types.Block
peer *bpPeer
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
func (bd *blockData) String() string {
if bd == nil {
return fmt.Sprintf("blockData nil")
}
if bd.block == nil {
if bd.peer == nil {
return fmt.Sprintf("block: nil peer: nil")
}
return fmt.Sprintf("block: nil peer: %v", bd.peer.id)
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
return fmt.Sprintf("block: %v peer: %v", bd.block.Height, bd.peer.id)
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
type blockPool struct {
logger log.Logger
2019-05-15 18:33:12 -04:00
// Set of peers that have sent status responses, with height bigger than pool.height
peers map[p2p.ID]*bpPeer
// Set of block heights and the corresponding peers from where a block response is expected or has been received.
2019-04-13 09:23:43 -04:00
blocks map[int64]p2p.ID
2019-05-15 23:33:19 -04:00
plannedRequests map[int64]struct{} // list of blocks to be assigned peers for blockRequest
nextRequestHeight int64 // next height to be added to plannedRequests
2019-04-13 09:23:43 -04:00
2019-05-15 18:33:12 -04:00
height int64 // height of next block to execute
2019-04-13 09:23:43 -04:00
maxPeerHeight int64 // maximum height of all peers
toBcR bcRMessageInterface
}
func newBlockPool(height int64, toBcR bcRMessageInterface) *blockPool {
return &blockPool{
peers: make(map[p2p.ID]*bpPeer),
maxPeerHeight: 0,
blocks: make(map[int64]p2p.ID),
2019-05-15 23:33:19 -04:00
plannedRequests: make(map[int64]struct{}),
2019-04-13 09:23:43 -04:00
nextRequestHeight: height,
height: height,
toBcR: toBcR,
}
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) String() string {
peerStr := fmt.Sprintf("Pool Peers:")
for _, p := range pool.peers {
peerStr += fmt.Sprintf("%v,", p)
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
return peerStr
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) setLogger(l log.Logger) {
pool.logger = l
2015-09-09 21:44:48 -07:00
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) reachedMaxHeight() bool {
2019-05-15 18:33:12 -04:00
return pool.height >= pool.maxPeerHeight
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) rescheduleRequest(peerID p2p.ID, height int64) {
pool.logger.Info("reschedule requests made to peer for height ", "peerID", peerID, "height", height)
2019-05-15 23:33:19 -04:00
pool.plannedRequests[height] = struct{}{}
2019-04-13 09:23:43 -04:00
delete(pool.blocks, height)
delete(pool.peers[peerID].blocks, height)
}
2015-03-24 11:02:30 -07:00
2019-04-13 09:23:43 -04:00
// Updates the pool's max height. If no peers are left maxPeerHeight is set to 0.
func (pool *blockPool) updateMaxPeerHeight() {
var newMax int64
2019-04-13 09:23:43 -04:00
for _, peer := range pool.peers {
if peer.height > newMax {
newMax = peer.height
}
2015-03-22 03:30:22 -07:00
}
pool.maxPeerHeight = newMax
2015-03-22 12:46:53 -07:00
}
2019-04-13 09:23:43 -04:00
// Adds a new peer or updates an existing peer with a new height.
// If a new peer is too short it is not added.
2019-04-13 09:23:43 -04:00
func (pool *blockPool) updatePeer(peerID p2p.ID, height int64) error {
peer := pool.peers[peerID]
oldHeight := int64(0)
if peer != nil {
oldHeight = peer.height
}
2019-05-15 23:33:19 -04:00
pool.logger.Info("updatePeer", "peerID", peerID, "height", height, "old_height", oldHeight)
2015-03-22 03:30:22 -07:00
2019-04-13 09:23:43 -04:00
if peer == nil {
if height < pool.height {
pool.logger.Info("Peer height too small", "peer", peerID, "height", height, "fsm_height", pool.height)
return errPeerTooShort
}
2019-04-13 09:23:43 -04:00
// Add new peer.
peer = newBPPeer(peerID, height, pool.toBcR.sendPeerError)
peer.setLogger(pool.logger.With("peer", peerID))
pool.peers[peerID] = peer
2019-04-13 09:23:43 -04:00
} else {
// Check if peer is lowering its height. This is not allowed.
if height < peer.height {
pool.removePeer(peerID, errPeerLowersItsHeight)
return errPeerLowersItsHeight
2019-04-13 09:23:43 -04:00
}
// Update existing peer.
2019-04-13 09:23:43 -04:00
peer.height = height
2015-03-24 11:02:30 -07:00
}
2019-05-15 18:33:12 -04:00
// Update the pool's maxPeerHeight if needed. Note that for updates it can only increase or left unchanged.
2019-04-13 09:23:43 -04:00
pool.updateMaxPeerHeight()
2015-03-24 11:02:30 -07:00
2019-04-13 09:23:43 -04:00
return nil
2015-03-22 12:46:53 -07:00
}
2019-04-13 09:23:43 -04:00
// Stops the peer timer and deletes the peer. Recomputes the max peer height.
2019-05-15 18:33:12 -04:00
func (pool *blockPool) deletePeer(peer *bpPeer) {
if peer == nil {
return
}
if peer.timeout != nil {
peer.timeout.Stop()
}
delete(pool.peers, peer.id)
2019-05-15 18:33:12 -04:00
if peer.height == pool.maxPeerHeight {
pool.updateMaxPeerHeight()
}
}
2019-04-13 09:23:43 -04:00
// Removes any blocks and requests associated with the peer and deletes the peer.
// Also triggers new requests if blocks have been removed.
func (pool *blockPool) removePeer(peerID p2p.ID, err error) {
peer := pool.peers[peerID]
if peer == nil {
return
}
2019-05-15 23:33:19 -04:00
pool.logger.Info("removing peer", "peerID", peerID, "error", err)
2019-05-15 18:33:12 -04:00
// Reschedule the block requests made to the peer, or received and not processed yet.
// Note that some of the requests may be removed further down.
2019-05-20 23:32:59 -04:00
for h := range pool.peers[peerID].blocks {
2019-04-13 09:23:43 -04:00
pool.rescheduleRequest(peerID, h)
}
2015-03-22 12:46:53 -07:00
2019-05-15 18:33:12 -04:00
oldMaxPeerHeight := pool.maxPeerHeight
// Delete the peer. This operation may result in the pool's maxPeerHeight being lowered.
pool.deletePeer(peer)
// Check if the pool's maxPeerHeight has been lowered.
// This may happen if the tallest peer has been removed.
if oldMaxPeerHeight > pool.maxPeerHeight {
// Remove any planned requests for heights over the new maxPeerHeight.
2019-05-15 23:33:19 -04:00
for h := range pool.plannedRequests {
2019-05-15 18:33:12 -04:00
if h > pool.maxPeerHeight {
2019-05-15 23:33:19 -04:00
delete(pool.plannedRequests, h)
2019-05-15 18:33:12 -04:00
}
}
// Adjust the nextRequestHeight to the new max plus one.
if pool.nextRequestHeight > pool.maxPeerHeight {
pool.nextRequestHeight = pool.maxPeerHeight + 1
}
}
2019-04-13 09:23:43 -04:00
}
2015-03-24 11:02:30 -07:00
2019-04-13 09:23:43 -04:00
// Called every time FSM advances its height.
func (pool *blockPool) removeShortPeers() {
for _, peer := range pool.peers {
2019-04-13 09:23:43 -04:00
if peer.height < pool.height {
pool.removePeer(peer.id, nil)
}
2015-03-22 03:30:22 -07:00
}
}
2019-04-15 23:26:07 -04:00
func (pool *blockPool) removeBadPeers() {
pool.removeShortPeers()
for _, peer := range pool.peers {
if err := peer.isGood(); err != nil {
pool.removePeer(peer.id, err)
2019-05-15 18:33:12 -04:00
peer.errFunc(err, peer.id)
2019-04-15 23:26:07 -04:00
}
}
}
2019-05-15 23:33:19 -04:00
// Make a batch of requests sorted by height. The parameter 'maxNumRequests' includes the number of block requests
// already made.
func (pool *blockPool) makeRequestBatch(maxNumRequests int32) []int {
2019-04-15 23:26:07 -04:00
pool.removeBadPeers()
2019-05-15 23:33:19 -04:00
// At this point pool.requests may include heights for requests to be redone due to removal of peers:
// - peers timed out or were removed by switch
// - FSM timed out on waiting to advance the block execution due to missing blocks at h or h+1
// Check if more requests should be tried by subtracting the number of requests already made from the maximum allowed
numNeeded := int(maxNumRequests) - len(pool.blocks)
for len(pool.plannedRequests) < numNeeded {
2019-04-15 23:26:07 -04:00
if pool.nextRequestHeight > pool.maxPeerHeight {
break
}
2019-05-15 23:33:19 -04:00
pool.plannedRequests[pool.nextRequestHeight] = struct{}{}
2019-04-15 23:26:07 -04:00
pool.nextRequestHeight++
}
2019-05-15 23:33:19 -04:00
heights := make([]int, 0, len(pool.plannedRequests))
for k := range pool.plannedRequests {
2019-04-15 23:26:07 -04:00
heights = append(heights, int(k))
}
sort.Ints(heights)
return heights
}
func (pool *blockPool) makeNextRequests(maxNumRequests int32) {
heights := pool.makeRequestBatch(maxNumRequests)
pool.logger.Info("makeNextRequests will make following requests", "number", len(heights), "heights", heights)
2019-04-15 23:26:07 -04:00
for _, height := range heights {
h := int64(height)
if !pool.sendRequest(h) {
2019-05-15 23:33:19 -04:00
// If a good peer was not found for sending the request at height h then return,
// as it shouldn't be possible to find a peer for h+1.
2019-04-15 23:26:07 -04:00
return
}
2019-05-15 23:33:19 -04:00
delete(pool.plannedRequests, h)
2019-04-15 23:26:07 -04:00
}
}
func (pool *blockPool) sendRequest(height int64) bool {
2019-04-15 23:26:07 -04:00
for _, peer := range pool.peers {
if peer.numPending >= int32(maxRequestsPerPeer) {
continue
}
if peer.height < height {
continue
}
2019-05-07 21:06:43 -04:00
2019-05-15 23:33:19 -04:00
err := pool.toBcR.sendBlockRequest(peer.id, height)
if err == errNilPeerForBlockRequest {
// Switch does not have this peer, remove it and continue to look for another peer.
2019-05-15 23:33:19 -04:00
pool.logger.Error("switch does not have peer..removing peer selected for height", "peer", peer.id, "height", height)
2019-04-15 23:26:07 -04:00
pool.removePeer(peer.id, err)
continue
}
2019-05-15 23:33:19 -04:00
if err == errSendQueueFull {
pool.logger.Error("peer queue is full", "peer", peer.id, "height", height)
continue
2019-04-15 23:26:07 -04:00
}
2019-05-15 23:33:19 -04:00
pool.logger.Info("assigned request to peer", "peer", peer.id, "height", height)
2019-04-15 23:26:07 -04:00
pool.blocks[height] = peer.id
peer.blocks[height] = nil
peer.incrPending()
return true
2019-04-15 23:26:07 -04:00
}
pool.logger.Error("could not find peer to send request for block at height", "height", height)
return false
2019-04-15 23:26:07 -04:00
}
2019-04-13 09:23:43 -04:00
// Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map.
func (pool *blockPool) addBlock(peerID p2p.ID, block *types.Block, blockSize int) error {
2019-05-15 18:33:12 -04:00
peer, ok := pool.peers[peerID]
if !ok {
2019-05-15 23:33:19 -04:00
pool.logger.Error("peer does not exist in the pool", "peer", peerID, "block_received", block.Height)
2019-04-13 09:23:43 -04:00
return errBadDataFromPeer
}
2019-04-13 09:23:43 -04:00
b, ok := pool.peers[peerID].blocks[block.Height]
if !ok {
pool.logger.Error("peer sent us a block we didn't expect", "peer", peerID, "blockHeight", block.Height)
if expPeerID, pok := pool.blocks[block.Height]; pok {
pool.logger.Error("expected this block from peer", "peer", expPeerID)
}
return errBadDataFromPeer
2017-09-06 13:11:47 -04:00
}
2019-04-13 09:23:43 -04:00
if b != nil {
pool.logger.Error("already have a block for height", "height", block.Height)
return errBadDataFromPeer
2015-03-22 03:30:22 -07:00
}
2019-05-15 18:33:12 -04:00
peer.blocks[block.Height] = block
peer.decrPending(blockSize)
2019-05-15 23:33:19 -04:00
pool.logger.Info("added new block", "height", block.Height, "from_peer", peerID,
"total_pool_blocks", len(pool.blocks), "peer_numPending", peer.numPending)
2019-04-13 09:23:43 -04:00
return nil
2015-03-24 11:02:30 -07:00
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) getBlockAndPeerAtHeight(height int64) (bData *blockData, err error) {
peerID := pool.blocks[height]
peer := pool.peers[peerID]
if peer == nil {
return nil, errMissingBlocks
2019-02-06 18:20:10 +04:00
}
2015-03-22 03:30:22 -07:00
2019-04-13 09:23:43 -04:00
block, ok := peer.blocks[height]
if !ok || block == nil {
return nil, errMissingBlocks
2019-04-13 09:23:43 -04:00
}
2016-06-28 18:02:27 -07:00
2019-04-13 09:23:43 -04:00
return &blockData{peer: peer, block: block}, nil
2017-05-02 11:53:32 +04:00
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) getNextTwoBlocks() (first, second *blockData, err error) {
first, err = pool.getBlockAndPeerAtHeight(pool.height)
second, err2 := pool.getBlockAndPeerAtHeight(pool.height + 1)
if err == nil {
err = err2
}
2019-04-13 09:23:43 -04:00
return
2017-05-02 11:53:32 +04:00
}
// Remove the peers that sent us the first two blocks, blocks are removed by removePeer().
2019-04-13 09:23:43 -04:00
func (pool *blockPool) invalidateFirstTwoBlocks(err error) {
first, err1 := pool.getBlockAndPeerAtHeight(pool.height)
second, err2 := pool.getBlockAndPeerAtHeight(pool.height + 1)
2019-04-13 09:23:43 -04:00
if err1 == nil {
pool.removePeer(first.peer.id, err)
}
2019-04-13 09:23:43 -04:00
if err2 == nil {
pool.removePeer(second.peer.id, err)
}
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) processedCurrentHeightBlock() {
peerID, peerOk := pool.blocks[pool.height]
if peerOk {
delete(pool.peers[peerID].blocks, pool.height)
}
2019-04-13 09:23:43 -04:00
delete(pool.blocks, pool.height)
pool.logger.Debug("processed and removed block at height", "height", pool.height)
pool.height++
pool.removeShortPeers()
}
// This function is called when the FSM is not able to make progress for a certain amount of time.
// This happens if the block at either pool.height or pool.height+1 has not been delivered during this time.
func (pool *blockPool) removePeerAtCurrentHeights(err error) {
2019-05-07 21:06:43 -04:00
peerID := pool.blocks[pool.height]
peer, ok := pool.peers[peerID]
if ok && peer.blocks[pool.height] == nil {
2019-05-15 23:33:19 -04:00
pool.logger.Info("removing peer that hasn't sent block at pool.height",
"peer", peer.id, "height", pool.height)
pool.removePeer(peer.id, err)
2019-05-07 21:06:43 -04:00
return
}
peerID = pool.blocks[pool.height+1]
peer, ok = pool.peers[peerID]
if ok && peer.blocks[pool.height+1] == nil {
2019-05-15 23:33:19 -04:00
pool.logger.Info("removing peer that hasn't sent block at pool.height+1",
"peer", peer.id, "height", pool.height+1)
pool.removePeer(peer.id, err)
return
}
pool.logger.Info("no peers assigned to blocks at current height or blocks already delivered",
"height", pool.height)
}
2019-04-15 23:26:07 -04:00
func (pool *blockPool) cleanup() {
2019-04-13 09:23:43 -04:00
for _, peer := range pool.peers {
2019-05-15 18:33:12 -04:00
peer.cleanup()
}
}