373 lines
10 KiB
Go
Raw Normal View History

package blockchain
2015-03-22 03:30:22 -07:00
import (
"fmt"
2019-04-13 09:23:43 -04:00
"sort"
2015-03-22 03:30:22 -07:00
2018-07-01 22:36:49 -04:00
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
2018-01-03 11:29:19 +01:00
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
2015-03-22 03:30:22 -07:00
)
2019-04-13 09:23:43 -04:00
type blockData struct {
block *types.Block
peer *bpPeer
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
func (bd *blockData) String() string {
if bd == nil {
return fmt.Sprintf("blockData nil")
}
if bd.block == nil {
if bd.peer == nil {
return fmt.Sprintf("block: nil peer: nil")
}
return fmt.Sprintf("block: nil peer: %v", bd.peer.id)
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
return fmt.Sprintf("block: %v peer: %v", bd.block.Height, bd.peer.id)
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
type blockPool struct {
logger log.Logger
peers map[p2p.ID]*bpPeer
blocks map[int64]p2p.ID
requests map[int64]bool // list of blocks to be assigned peers for blockRequest
nextRequestHeight int64 // next height to be added to requests
2019-04-13 09:23:43 -04:00
height int64 // processing height
maxPeerHeight int64 // maximum height of all peers
numPending int32 // total numPending across peers
toBcR bcRMessageInterface
}
func newBlockPool(height int64, toBcR bcRMessageInterface) *blockPool {
return &blockPool{
peers: make(map[p2p.ID]*bpPeer),
maxPeerHeight: 0,
blocks: make(map[int64]p2p.ID),
requests: make(map[int64]bool),
nextRequestHeight: height,
height: height,
toBcR: toBcR,
}
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) String() string {
peerStr := fmt.Sprintf("Pool Peers:")
for _, p := range pool.peers {
peerStr += fmt.Sprintf("%v,", p)
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
return peerStr
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) setLogger(l log.Logger) {
pool.logger = l
2015-09-09 21:44:48 -07:00
}
2019-04-13 09:23:43 -04:00
func (pool blockPool) getMaxPeerHeight() int64 {
return pool.maxPeerHeight
2015-03-24 11:02:30 -07:00
}
2015-03-22 16:23:24 -07:00
2019-04-13 09:23:43 -04:00
func (pool *blockPool) reachedMaxHeight() bool {
return pool.maxPeerHeight == 0 || pool.height >= pool.maxPeerHeight
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) rescheduleRequest(peerID p2p.ID, height int64) {
pool.logger.Debug("reschedule requests made to peer for height ", "peerID", peerID, "height", height)
pool.requests[height] = true
delete(pool.blocks, height)
delete(pool.peers[peerID].blocks, height)
}
2015-03-24 11:02:30 -07:00
2019-04-13 09:23:43 -04:00
// Updates the pool's max height. If no peers are left maxPeerHeight is set to 0.
func (pool *blockPool) updateMaxPeerHeight() {
var newMax int64
2019-04-13 09:23:43 -04:00
for _, peer := range pool.peers {
if peer.height > newMax {
newMax = peer.height
}
2015-03-22 03:30:22 -07:00
}
if newMax < pool.maxPeerHeight {
// Remove any planned requests for heights over the new maxPeerHeight.
// This may happen if a peer has updated with lower height.
for h := range pool.requests {
if h > newMax {
delete(pool.requests, h)
}
}
}
if pool.nextRequestHeight > newMax {
pool.nextRequestHeight = newMax + 1
}
pool.maxPeerHeight = newMax
2015-03-22 12:46:53 -07:00
}
2019-04-13 09:23:43 -04:00
// Adds a new peer or updates an existing peer with a new height.
// If the peer is too short it is removed.
func (pool *blockPool) updatePeer(peerID p2p.ID, height int64) error {
pool.logger.Debug("updatePeer", "peerID", peerID, "height", height)
peer := pool.peers[peerID]
2015-03-22 03:30:22 -07:00
2019-04-13 09:23:43 -04:00
if height < pool.height {
pool.logger.Info("Peer height too small", "peer", peerID, "height", height, "fsm_height", pool.height)
2015-03-22 12:46:53 -07:00
2019-04-13 09:23:43 -04:00
// Don't add or update a peer that is not useful.
fix invalid memory address or nil pointer dereference error (Refs #762) https://github.com/tendermint/tendermint/issues/762#issuecomment-338276055 ``` E[10-19|04:52:38.969] Stopping peer for error module=p2p peer="Peer{MConn{178.62.46.14:46656} B14916FAF38A out}" err="Error: runtime error: invalid memory address or nil pointer dereference\nStack: goroutine 529485 [running]:\nruntime/debug.Stack(0xc4355cfb38, 0xb463e0, 0x11b1c30)\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0xa7\ngithub.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p.(*MConnection)._recover(0xc439a28870)\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p/connection.go:206 +0x6e\npanic(0xb463e0, 0x11b1c30)\n\t/usr/local/go/src/runtime/panic.go:491 +0x283\ngithub.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/blockchain.(*bpPeer).decrPending(0x0, 0x381)\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/blockchain/pool.go:376 +0x22\ngithub.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/blockchain.(*BlockPool).AddBlock(0xc4200e4000, 0xc4266d1f00, 0x40, 0xc432ac9640, 0x381)\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/blockchain/pool.go:215 +0x139\ngithub.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/blockchain.(*BlockchainReactor).Receive(0xc42050a780, 0xc420257740, 0x1171be0, 0xc42ff302d0, 0xc4384b2000, 0x381, 0x1000)\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/blockchain/reactor.go:160 +0x712\ngithub.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p.createMConnection.func1(0x11e5040, 0xc4384b2000, 0x381, 0x1000)\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p/peer.go:334 +0xbd\ngithub.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p.(*MConnection).recvRoutine(0xc439a28870)\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p/connection.go:475 +0x4a3\ncreated by github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p.(*MConnection).OnStart\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p/connection.go:170 +0x187\n" ```
2017-10-20 21:56:10 +04:00
if peer != nil {
2019-04-13 09:23:43 -04:00
pool.logger.Info("remove short peer", "peer", peerID, "height", height, "fsm_height", pool.height)
pool.removePeer(peerID, errPeerTooShort)
fix invalid memory address or nil pointer dereference error (Refs #762) https://github.com/tendermint/tendermint/issues/762#issuecomment-338276055 ``` E[10-19|04:52:38.969] Stopping peer for error module=p2p peer="Peer{MConn{178.62.46.14:46656} B14916FAF38A out}" err="Error: runtime error: invalid memory address or nil pointer dereference\nStack: goroutine 529485 [running]:\nruntime/debug.Stack(0xc4355cfb38, 0xb463e0, 0x11b1c30)\n\t/usr/local/go/src/runtime/debug/stack.go:24 +0xa7\ngithub.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p.(*MConnection)._recover(0xc439a28870)\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p/connection.go:206 +0x6e\npanic(0xb463e0, 0x11b1c30)\n\t/usr/local/go/src/runtime/panic.go:491 +0x283\ngithub.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/blockchain.(*bpPeer).decrPending(0x0, 0x381)\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/blockchain/pool.go:376 +0x22\ngithub.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/blockchain.(*BlockPool).AddBlock(0xc4200e4000, 0xc4266d1f00, 0x40, 0xc432ac9640, 0x381)\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/blockchain/pool.go:215 +0x139\ngithub.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/blockchain.(*BlockchainReactor).Receive(0xc42050a780, 0xc420257740, 0x1171be0, 0xc42ff302d0, 0xc4384b2000, 0x381, 0x1000)\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/blockchain/reactor.go:160 +0x712\ngithub.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p.createMConnection.func1(0x11e5040, 0xc4384b2000, 0x381, 0x1000)\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p/peer.go:334 +0xbd\ngithub.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p.(*MConnection).recvRoutine(0xc439a28870)\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p/connection.go:475 +0x4a3\ncreated by github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p.(*MConnection).OnStart\n\t/home/ubuntu/go/src/github.com/cosmos/gaia/vendor/github.com/tendermint/tendermint/p2p/connection.go:170 +0x187\n" ```
2017-10-20 21:56:10 +04:00
}
2019-04-13 09:23:43 -04:00
return errPeerTooShort
2017-06-23 21:36:47 -04:00
}
2019-04-13 09:23:43 -04:00
if peer == nil {
// Add new peer.
peer = newBPPeer(peerID, height, pool.toBcR.sendPeerError)
peer.setLogger(pool.logger.With("peer", peerID))
pool.peers[peerID] = peer
2019-04-13 09:23:43 -04:00
} else {
// Update existing peer.
// Remove any requests made for heights in range (height, peer.height].
2019-04-13 09:23:43 -04:00
for h, block := range pool.peers[peerID].blocks {
if h <= height {
continue
}
// Reschedule the requests for all blocks waiting for the peer, or received and not processed yet.
if block == nil {
// Since block was not yet received it is counted in numPending, decrement.
pool.numPending--
pool.peers[peerID].numPending--
}
pool.rescheduleRequest(peerID, h)
}
peer.height = height
2015-03-24 11:02:30 -07:00
}
2019-04-13 09:23:43 -04:00
pool.updateMaxPeerHeight()
2015-03-24 11:02:30 -07:00
2019-04-13 09:23:43 -04:00
return nil
2015-03-22 12:46:53 -07:00
}
2019-04-13 09:23:43 -04:00
// Stops the peer timer and deletes the peer. Recomputes the max peer height.
func (pool *blockPool) deletePeer(peerID p2p.ID) {
if p, ok := pool.peers[peerID]; ok {
if p.timeout != nil {
p.timeout.Stop()
2015-03-22 12:46:53 -07:00
}
delete(pool.peers, peerID)
2019-04-13 09:23:43 -04:00
if p.height == pool.maxPeerHeight {
pool.updateMaxPeerHeight()
}
}
}
2019-04-13 09:23:43 -04:00
// Removes any blocks and requests associated with the peer and deletes the peer.
// Also triggers new requests if blocks have been removed.
func (pool *blockPool) removePeer(peerID p2p.ID, err error) {
pool.logger.Debug("removing peer", "peerID", peerID)
peer := pool.peers[peerID]
if peer == nil {
return
}
// Reschedule the requests for all blocks waiting for the peer, or received and not processed yet.
for h, block := range pool.peers[peerID].blocks {
if block == nil {
pool.numPending--
}
2019-04-13 09:23:43 -04:00
pool.rescheduleRequest(peerID, h)
}
2019-04-13 09:23:43 -04:00
pool.deletePeer(peerID)
2015-03-22 12:46:53 -07:00
2019-04-13 09:23:43 -04:00
}
2015-03-24 11:02:30 -07:00
2019-04-13 09:23:43 -04:00
// Called every time FSM advances its height.
func (pool *blockPool) removeShortPeers() {
for _, peer := range pool.peers {
2019-04-13 09:23:43 -04:00
if peer.height < pool.height {
pool.removePeer(peer.id, nil)
}
2015-03-22 03:30:22 -07:00
}
}
2019-04-15 23:26:07 -04:00
func (pool *blockPool) removeBadPeers() {
pool.removeShortPeers()
for _, peer := range pool.peers {
if err := peer.isGood(); err != nil {
pool.removePeer(peer.id, err)
if err == errSlowPeer {
peer.errFunc(errSlowPeer, peer.id)
}
}
}
}
func (pool *blockPool) makeRequestBatch(maxNumPendingRequests int32) []int {
pool.removeBadPeers()
// If running low on planned requests, make more.
numNeeded := int32(cmn.MinInt(int(maxNumPendingRequests), len(pool.peers)*int(maxRequestsPerPeer))) - pool.numPending
for int32(len(pool.requests)) < numNeeded {
if pool.nextRequestHeight > pool.maxPeerHeight {
break
}
pool.requests[pool.nextRequestHeight] = true
pool.nextRequestHeight++
}
heights := make([]int, 0, len(pool.requests))
for k := range pool.requests {
heights = append(heights, int(k))
}
sort.Ints(heights)
return heights
}
func (pool *blockPool) makeNextRequests(maxNumPendingRequests int32) {
heights := pool.makeRequestBatch(maxNumPendingRequests)
pool.logger.Debug("makeNextRequests will make following requests", "number", len(heights), "heights", heights)
for _, height := range heights {
h := int64(height)
if !pool.sendRequest(h) {
2019-04-15 23:26:07 -04:00
return
}
delete(pool.requests, h)
}
}
func (pool *blockPool) sendRequest(height int64) bool {
2019-04-15 23:26:07 -04:00
for _, peer := range pool.peers {
if peer.numPending >= int32(maxRequestsPerPeer) {
continue
}
if peer.height < height {
continue
}
2019-05-07 21:06:43 -04:00
// Log with Info if the request is made for current processing height, Debug otherwise
if height == pool.height {
pool.logger.Info("assign request to peer", "peer", peer.id, "height", height)
} else {
pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height)
}
2019-04-15 23:26:07 -04:00
if err := pool.toBcR.sendBlockRequest(peer.id, height); err == errNilPeerForBlockRequest {
pool.removePeer(peer.id, err)
}
pool.blocks[height] = peer.id
pool.numPending++
peer.blocks[height] = nil
peer.incrPending()
return true
2019-04-15 23:26:07 -04:00
}
pool.logger.Error("could not find peer to send request for block at height", "height", height)
return false
2019-04-15 23:26:07 -04:00
}
2019-04-13 09:23:43 -04:00
// Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map.
func (pool *blockPool) addBlock(peerID p2p.ID, block *types.Block, blockSize int) error {
if _, ok := pool.peers[peerID]; !ok {
pool.logger.Error("peer doesn't exist", "peer", peerID, "block_receieved", block.Height)
return errBadDataFromPeer
}
2019-04-13 09:23:43 -04:00
b, ok := pool.peers[peerID].blocks[block.Height]
if !ok {
pool.logger.Error("peer sent us a block we didn't expect", "peer", peerID, "blockHeight", block.Height)
if expPeerID, pok := pool.blocks[block.Height]; pok {
pool.logger.Error("expected this block from peer", "peer", expPeerID)
}
return errBadDataFromPeer
2017-09-06 13:11:47 -04:00
}
2019-04-13 09:23:43 -04:00
if b != nil {
pool.logger.Error("already have a block for height", "height", block.Height)
return errBadDataFromPeer
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
pool.peers[peerID].blocks[block.Height] = block
pool.blocks[block.Height] = peerID
pool.numPending--
pool.peers[peerID].decrPending(blockSize)
2019-05-07 21:06:43 -04:00
pool.logger.Info("added new block", "height", block.Height, "from_peer", peerID, "total", len(pool.blocks))
2019-04-13 09:23:43 -04:00
return nil
2015-03-24 11:02:30 -07:00
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) getBlockAndPeerAtHeight(height int64) (bData *blockData, err error) {
peerID := pool.blocks[height]
peer := pool.peers[peerID]
if peer == nil {
return nil, errMissingBlocks
2019-02-06 18:20:10 +04:00
}
2015-03-22 03:30:22 -07:00
2019-04-13 09:23:43 -04:00
block, ok := peer.blocks[height]
if !ok || block == nil {
return nil, errMissingBlocks
2019-04-13 09:23:43 -04:00
}
2016-06-28 18:02:27 -07:00
2019-04-13 09:23:43 -04:00
return &blockData{peer: peer, block: block}, nil
2017-05-02 11:53:32 +04:00
2015-03-22 03:30:22 -07:00
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) getNextTwoBlocks() (first, second *blockData, err error) {
first, err = pool.getBlockAndPeerAtHeight(pool.height)
second, err2 := pool.getBlockAndPeerAtHeight(pool.height + 1)
if err == nil {
err = err2
}
2019-04-13 09:23:43 -04:00
return
2017-05-02 11:53:32 +04:00
}
// Remove the peers that sent us the first two blocks, blocks are removed by removePeer().
2019-04-13 09:23:43 -04:00
func (pool *blockPool) invalidateFirstTwoBlocks(err error) {
first, err1 := pool.getBlockAndPeerAtHeight(pool.height)
second, err2 := pool.getBlockAndPeerAtHeight(pool.height + 1)
2019-04-13 09:23:43 -04:00
if err1 == nil {
pool.removePeer(first.peer.id, err)
}
2019-04-13 09:23:43 -04:00
if err2 == nil {
pool.removePeer(second.peer.id, err)
}
}
2019-04-13 09:23:43 -04:00
func (pool *blockPool) processedCurrentHeightBlock() {
peerID, peerOk := pool.blocks[pool.height]
if peerOk {
delete(pool.peers[peerID].blocks, pool.height)
}
2019-04-13 09:23:43 -04:00
delete(pool.blocks, pool.height)
pool.logger.Debug("processed and removed block at height", "height", pool.height)
pool.height++
pool.removeShortPeers()
}
// This function is called when the FSM is not able to make progress for a certain amount of time.
// This happens if the block at either pool.height or pool.height+1 has not been delivered during this time.
func (pool *blockPool) removePeerAtCurrentHeights(err error) {
2019-05-07 21:06:43 -04:00
peerID := pool.blocks[pool.height]
peer, ok := pool.peers[peerID]
if ok && peer.blocks[pool.height] == nil {
pool.removePeer(peer.id, err)
2019-05-07 21:06:43 -04:00
return
}
peerID = pool.blocks[pool.height+1]
peer, ok = pool.peers[peerID]
if ok && peer.blocks[pool.height+1] == nil {
pool.removePeer(peer.id, err)
}
}
2019-04-15 23:26:07 -04:00
func (pool *blockPool) cleanup() {
2019-04-13 09:23:43 -04:00
for _, peer := range pool.peers {
2019-04-15 23:26:07 -04:00
if peer.timeout != nil {
peer.timeout.Stop()
2019-04-13 09:23:43 -04:00
}
}
}