cleanup, golint, renaming stuff

This commit is contained in:
Anca Zamfir
2019-06-19 15:18:08 +02:00
parent 7bf1683077
commit bf41ac2d2c
18 changed files with 526 additions and 679 deletions

View File

@ -1,4 +1,4 @@
package blockchain package v0
import ( import (
"errors" "errors"

View File

@ -1,4 +1,4 @@
package blockchain package v0
import ( import (
"fmt" "fmt"

View File

@ -1,4 +1,4 @@
package blockchain package v0
import ( import (
"errors" "errors"

View File

@ -1,4 +1,4 @@
package blockchain package v0
import ( import (
"os" "os"

View File

@ -1,4 +1,4 @@
package blockchain package v0
import ( import (
amino "github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"

View File

@ -1,4 +1,4 @@
package blockchainexp package v1
import ( import (
"fmt" "fmt"
@ -13,14 +13,17 @@ import (
//-------- //--------
// Peer // Peer
type bpPeerParams struct {
// BpPeerParams stores the peer parameters that are used when creating a peer.
type BpPeerParams struct {
timeout time.Duration timeout time.Duration
minRecvRate int64 minRecvRate int64
sampleRate time.Duration sampleRate time.Duration
windowSize time.Duration windowSize time.Duration
} }
type bpPeer struct { // BpPeer is the datastructure associated with a fast sync peer.
type BpPeer struct {
logger log.Logger logger log.Logger
ID p2p.ID ID p2p.ID
@ -29,20 +32,20 @@ type bpPeer struct {
blocks map[int64]*types.Block // blocks received or expected to be received from this peer blocks map[int64]*types.Block // blocks received or expected to be received from this peer
blockResponseTimer *time.Timer blockResponseTimer *time.Timer
recvMonitor *flow.Monitor recvMonitor *flow.Monitor
params *bpPeerParams // parameters for timer and monitor params *BpPeerParams // parameters for timer and monitor
onErr func(err error, peerID p2p.ID) // function to call on error onErr func(err error, peerID p2p.ID) // function to call on error
} }
// NewBPPeer creates a new peer. // NewBpPeer creates a new peer.
func NewBPPeer( func NewBpPeer(
peerID p2p.ID, height int64, onErr func(err error, peerID p2p.ID), params *bpPeerParams) *bpPeer { peerID p2p.ID, height int64, onErr func(err error, peerID p2p.ID), params *BpPeerParams) *BpPeer {
if params == nil { if params == nil {
params = bpPeerDefaultParams() params = BpPeerDefaultParams()
} }
return &bpPeer{ return &BpPeer{
ID: peerID, ID: peerID,
Height: height, Height: height,
blocks: make(map[int64]*types.Block, maxRequestsPerPeer), blocks: make(map[int64]*types.Block, maxRequestsPerPeer),
@ -53,17 +56,17 @@ func NewBPPeer(
} }
// String returns a string representation of a peer. // String returns a string representation of a peer.
func (peer *bpPeer) String() string { func (peer *BpPeer) String() string {
return fmt.Sprintf("peer: %v height: %v pending: %v", peer.ID, peer.Height, peer.NumPendingBlockRequests) return fmt.Sprintf("peer: %v height: %v pending: %v", peer.ID, peer.Height, peer.NumPendingBlockRequests)
} }
// SetLogger sets the logger of the peer. // SetLogger sets the logger of the peer.
func (peer *bpPeer) SetLogger(l log.Logger) { func (peer *BpPeer) SetLogger(l log.Logger) {
peer.logger = l peer.logger = l
} }
// Cleanup performs cleanup of the peer, removes blocks, requests, stops timers and monitors. // Cleanup performs cleanup of the peer, removes blocks, requests, stops timers and monitors.
func (peer *bpPeer) Cleanup() { func (peer *BpPeer) Cleanup() {
if peer.blockResponseTimer != nil { if peer.blockResponseTimer != nil {
peer.blockResponseTimer.Stop() peer.blockResponseTimer.Stop()
} }
@ -81,7 +84,7 @@ func (peer *bpPeer) Cleanup() {
} }
// BlockAtHeight returns the block at a given height if available and errMissingBlock otherwise. // BlockAtHeight returns the block at a given height if available and errMissingBlock otherwise.
func (peer *bpPeer) BlockAtHeight(height int64) (*types.Block, error) { func (peer *BpPeer) BlockAtHeight(height int64) (*types.Block, error) {
block, ok := peer.blocks[height] block, ok := peer.blocks[height]
if !ok { if !ok {
return nil, errMissingRequest return nil, errMissingRequest
@ -94,7 +97,7 @@ func (peer *bpPeer) BlockAtHeight(height int64) (*types.Block, error) {
// AddBlock adds a block at peer level. Block must be non-nil and recvSize a positive integer // AddBlock adds a block at peer level. Block must be non-nil and recvSize a positive integer
// The peer must have a pending request for this block. // The peer must have a pending request for this block.
func (peer *bpPeer) AddBlock(block *types.Block, recvSize int) error { func (peer *BpPeer) AddBlock(block *types.Block, recvSize int) error {
if block == nil || recvSize < 0 { if block == nil || recvSize < 0 {
panic("bad parameters") panic("bad parameters")
} }
@ -123,12 +126,12 @@ func (peer *bpPeer) AddBlock(block *types.Block, recvSize int) error {
} }
// RemoveBlock removes the block of given height // RemoveBlock removes the block of given height
func (peer *bpPeer) RemoveBlock(height int64) { func (peer *BpPeer) RemoveBlock(height int64) {
delete(peer.blocks, height) delete(peer.blocks, height)
} }
// RequestSent records that a request was sent, and starts the peer timer and monitor if needed. // RequestSent records that a request was sent, and starts the peer timer and monitor if needed.
func (peer *bpPeer) RequestSent(height int64) { func (peer *BpPeer) RequestSent(height int64) {
peer.blocks[height] = nil peer.blocks[height] = nil
if peer.NumPendingBlockRequests == 0 { if peer.NumPendingBlockRequests == 0 {
@ -139,7 +142,7 @@ func (peer *bpPeer) RequestSent(height int64) {
} }
// CheckRate verifies that the response rate of the peer is acceptable (higher than the minimum allowed). // CheckRate verifies that the response rate of the peer is acceptable (higher than the minimum allowed).
func (peer *bpPeer) CheckRate() error { func (peer *BpPeer) CheckRate() error {
if peer.NumPendingBlockRequests == 0 { if peer.NumPendingBlockRequests == 0 {
return nil return nil
} }
@ -156,22 +159,22 @@ func (peer *bpPeer) CheckRate() error {
return nil return nil
} }
func (peer *bpPeer) onTimeout() { func (peer *BpPeer) onTimeout() {
peer.onErr(errNoPeerResponse, peer.ID) peer.onErr(errNoPeerResponse, peer.ID)
} }
func (peer *bpPeer) stopMonitor() { func (peer *BpPeer) stopMonitor() {
peer.recvMonitor.Done() peer.recvMonitor.Done()
peer.recvMonitor = nil peer.recvMonitor = nil
} }
func (peer *bpPeer) startMonitor() { func (peer *BpPeer) startMonitor() {
peer.recvMonitor = flow.New(peer.params.sampleRate, peer.params.windowSize) peer.recvMonitor = flow.New(peer.params.sampleRate, peer.params.windowSize)
initialValue := float64(peer.params.minRecvRate) * math.E initialValue := float64(peer.params.minRecvRate) * math.E
peer.recvMonitor.SetREMA(initialValue) peer.recvMonitor.SetREMA(initialValue)
} }
func (peer *bpPeer) resetBlockResponseTimer() { func (peer *BpPeer) resetBlockResponseTimer() {
if peer.blockResponseTimer == nil { if peer.blockResponseTimer == nil {
peer.blockResponseTimer = time.AfterFunc(peer.params.timeout, peer.onTimeout) peer.blockResponseTimer = time.AfterFunc(peer.params.timeout, peer.onTimeout)
} else { } else {
@ -179,15 +182,16 @@ func (peer *bpPeer) resetBlockResponseTimer() {
} }
} }
func (peer *bpPeer) stopBlockResponseTimer() bool { func (peer *BpPeer) stopBlockResponseTimer() bool {
if peer.blockResponseTimer == nil { if peer.blockResponseTimer == nil {
return false return false
} }
return peer.blockResponseTimer.Stop() return peer.blockResponseTimer.Stop()
} }
func bpPeerDefaultParams() *bpPeerParams { // BpPeerDefaultParams returns the default peer parameters.
return &bpPeerParams{ func BpPeerDefaultParams() *BpPeerParams {
return &BpPeerParams{
// Timeout for a peer to respond to a block request. // Timeout for a peer to respond to a block request.
timeout: 15 * time.Second, timeout: 15 * time.Second,

View File

@ -1,4 +1,4 @@
package blockchainexp package v1
import ( import (
"sync" "sync"
@ -15,7 +15,7 @@ import (
) )
func TestPeerMonitor(t *testing.T) { func TestPeerMonitor(t *testing.T) {
peer := NewBPPeer( peer := NewBpPeer(
p2p.ID(cmn.RandStr(12)), 10, p2p.ID(cmn.RandStr(12)), 10,
func(err error, _ p2p.ID) {}, func(err error, _ p2p.ID) {},
nil) nil)
@ -32,9 +32,9 @@ func TestPeerResetBlockResponseTimer(t *testing.T) {
lastErr error // last generated error lastErr error // last generated error
peerTestMtx sync.Mutex // modifications of ^^ variables are also done from timer handler goroutine peerTestMtx sync.Mutex // modifications of ^^ variables are also done from timer handler goroutine
) )
params := &bpPeerParams{timeout: 2 * time.Millisecond} params := &BpPeerParams{timeout: 2 * time.Millisecond}
peer := NewBPPeer( peer := NewBpPeer(
p2p.ID(cmn.RandStr(12)), 10, p2p.ID(cmn.RandStr(12)), 10,
func(err error, _ p2p.ID) { func(err error, _ p2p.ID) {
peerTestMtx.Lock() peerTestMtx.Lock()
@ -72,9 +72,9 @@ func TestPeerResetBlockResponseTimer(t *testing.T) {
} }
func TestPeerRequestSent(t *testing.T) { func TestPeerRequestSent(t *testing.T) {
params := &bpPeerParams{timeout: 2 * time.Millisecond} params := &BpPeerParams{timeout: 2 * time.Millisecond}
peer := NewBPPeer( peer := NewBpPeer(
p2p.ID(cmn.RandStr(12)), 10, p2p.ID(cmn.RandStr(12)), 10,
func(err error, _ p2p.ID) {}, func(err error, _ p2p.ID) {},
params) params)
@ -93,7 +93,7 @@ func TestPeerRequestSent(t *testing.T) {
} }
func TestPeerGetAndRemoveBlock(t *testing.T) { func TestPeerGetAndRemoveBlock(t *testing.T) {
peer := NewBPPeer( peer := NewBpPeer(
p2p.ID(cmn.RandStr(12)), 100, p2p.ID(cmn.RandStr(12)), 100,
func(err error, _ p2p.ID) {}, func(err error, _ p2p.ID) {},
nil) nil)
@ -141,7 +141,7 @@ func TestPeerGetAndRemoveBlock(t *testing.T) {
} }
func TestPeerAddBlock(t *testing.T) { func TestPeerAddBlock(t *testing.T) {
peer := NewBPPeer( peer := NewBpPeer(
p2p.ID(cmn.RandStr(12)), 100, p2p.ID(cmn.RandStr(12)), 100,
func(err error, _ p2p.ID) {}, func(err error, _ p2p.ID) {},
nil) nil)
@ -180,14 +180,14 @@ func TestPeerAddBlock(t *testing.T) {
func TestPeerOnErrFuncCalledDueToExpiration(t *testing.T) { func TestPeerOnErrFuncCalledDueToExpiration(t *testing.T) {
params := &bpPeerParams{timeout: 2 * time.Millisecond} params := &BpPeerParams{timeout: 2 * time.Millisecond}
var ( var (
numErrFuncCalls int // number of calls to the onErr function numErrFuncCalls int // number of calls to the onErr function
lastErr error // last generated error lastErr error // last generated error
peerTestMtx sync.Mutex // modifications of ^^ variables are also done from timer handler goroutine peerTestMtx sync.Mutex // modifications of ^^ variables are also done from timer handler goroutine
) )
peer := NewBPPeer( peer := NewBpPeer(
p2p.ID(cmn.RandStr(12)), 10, p2p.ID(cmn.RandStr(12)), 10,
func(err error, _ p2p.ID) { func(err error, _ p2p.ID) {
peerTestMtx.Lock() peerTestMtx.Lock()
@ -209,11 +209,11 @@ func TestPeerOnErrFuncCalledDueToExpiration(t *testing.T) {
} }
func TestPeerCheckRate(t *testing.T) { func TestPeerCheckRate(t *testing.T) {
params := &bpPeerParams{ params := &BpPeerParams{
timeout: time.Second, timeout: time.Second,
minRecvRate: int64(100), // 100 bytes/sec exponential moving average minRecvRate: int64(100), // 100 bytes/sec exponential moving average
} }
peer := NewBPPeer( peer := NewBpPeer(
p2p.ID(cmn.RandStr(12)), 10, p2p.ID(cmn.RandStr(12)), 10,
func(err error, _ p2p.ID) {}, func(err error, _ p2p.ID) {},
params) params)
@ -245,9 +245,9 @@ func TestPeerCheckRate(t *testing.T) {
} }
func TestPeerCleanup(t *testing.T) { func TestPeerCleanup(t *testing.T) {
params := &bpPeerParams{timeout: 2 * time.Millisecond} params := &BpPeerParams{timeout: 2 * time.Millisecond}
peer := NewBPPeer( peer := NewBpPeer(
p2p.ID(cmn.RandStr(12)), 10, p2p.ID(cmn.RandStr(12)), 10,
func(err error, _ p2p.ID) {}, func(err error, _ p2p.ID) {},
params) params)
@ -263,7 +263,7 @@ func TestPeerCleanup(t *testing.T) {
// Check if peer timer is running or not (a running timer can be successfully stopped). // Check if peer timer is running or not (a running timer can be successfully stopped).
// Note: stops the timer. // Note: stops the timer.
func checkByStoppingPeerTimer(t *testing.T, peer *bpPeer, running bool) { func checkByStoppingPeerTimer(t *testing.T, peer *BpPeer, running bool) {
assert.NotPanics(t, func() { assert.NotPanics(t, func() {
stopped := peer.stopBlockResponseTimer() stopped := peer.stopBlockResponseTimer()
if running { if running {

View File

@ -1,4 +1,4 @@
package blockchainexp package v1
import ( import (
"sort" "sort"
@ -8,10 +8,11 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
type blockPool struct { // BlockPool keeps track of the fast sync peers, block requests and block responses.
type BlockPool struct {
logger log.Logger logger log.Logger
// Set of peers that have sent status responses, with height bigger than pool.Height // Set of peers that have sent status responses, with height bigger than pool.Height
peers map[p2p.ID]*bpPeer peers map[p2p.ID]*BpPeer
// Set of block heights and the corresponding peers from where a block response is expected or has been received. // Set of block heights and the corresponding peers from where a block response is expected or has been received.
blocks map[int64]p2p.ID blocks map[int64]p2p.ID
@ -23,10 +24,10 @@ type blockPool struct {
toBcR bcReactor toBcR bcReactor
} }
// NewBlockPool creates a new blockPool. // NewBlockPool creates a new BlockPool.
func NewBlockPool(height int64, toBcR bcReactor) *blockPool { func NewBlockPool(height int64, toBcR bcReactor) *BlockPool {
return &blockPool{ return &BlockPool{
peers: make(map[p2p.ID]*bpPeer), peers: make(map[p2p.ID]*BpPeer),
MaxPeerHeight: 0, MaxPeerHeight: 0,
blocks: make(map[int64]p2p.ID), blocks: make(map[int64]p2p.ID),
plannedRequests: make(map[int64]struct{}), plannedRequests: make(map[int64]struct{}),
@ -37,16 +38,16 @@ func NewBlockPool(height int64, toBcR bcReactor) *blockPool {
} }
// SetLogger sets the logger of the pool. // SetLogger sets the logger of the pool.
func (pool *blockPool) SetLogger(l log.Logger) { func (pool *BlockPool) SetLogger(l log.Logger) {
pool.logger = l pool.logger = l
} }
// ReachedMaxHeight check if the pool has reached the maximum peer height. // ReachedMaxHeight check if the pool has reached the maximum peer height.
func (pool *blockPool) ReachedMaxHeight() bool { func (pool *BlockPool) ReachedMaxHeight() bool {
return pool.Height >= pool.MaxPeerHeight return pool.Height >= pool.MaxPeerHeight
} }
func (pool *blockPool) rescheduleRequest(peerID p2p.ID, height int64) { func (pool *BlockPool) rescheduleRequest(peerID p2p.ID, height int64) {
pool.logger.Info("reschedule requests made to peer for height ", "peerID", peerID, "height", height) pool.logger.Info("reschedule requests made to peer for height ", "peerID", peerID, "height", height)
pool.plannedRequests[height] = struct{}{} pool.plannedRequests[height] = struct{}{}
delete(pool.blocks, height) delete(pool.blocks, height)
@ -54,7 +55,7 @@ func (pool *blockPool) rescheduleRequest(peerID p2p.ID, height int64) {
} }
// Updates the pool's max height. If no peers are left MaxPeerHeight is set to 0. // Updates the pool's max height. If no peers are left MaxPeerHeight is set to 0.
func (pool *blockPool) updateMaxPeerHeight() { func (pool *BlockPool) updateMaxPeerHeight() {
var newMax int64 var newMax int64
for _, peer := range pool.peers { for _, peer := range pool.peers {
peerHeight := peer.Height peerHeight := peer.Height
@ -66,8 +67,8 @@ func (pool *blockPool) updateMaxPeerHeight() {
} }
// UpdatePeer adds a new peer or updates an existing peer with a new height. // UpdatePeer adds a new peer or updates an existing peer with a new height.
// If a peer is too short it is not added. // If a peer is short it is not added.
func (pool *blockPool) UpdatePeer(peerID p2p.ID, height int64) error { func (pool *BlockPool) UpdatePeer(peerID p2p.ID, height int64) error {
peer := pool.peers[peerID] peer := pool.peers[peerID]
@ -78,7 +79,7 @@ func (pool *blockPool) UpdatePeer(peerID p2p.ID, height int64) error {
return errPeerTooShort return errPeerTooShort
} }
// Add new peer. // Add new peer.
peer = NewBPPeer(peerID, height, pool.toBcR.sendPeerError, nil) peer = NewBpPeer(peerID, height, pool.toBcR.sendPeerError, nil)
peer.SetLogger(pool.logger.With("peer", peerID)) peer.SetLogger(pool.logger.With("peer", peerID))
pool.peers[peerID] = peer pool.peers[peerID] = peer
pool.logger.Info("added peer", "peerID", peerID, "height", height, "num_peers", len(pool.peers)) pool.logger.Info("added peer", "peerID", peerID, "height", height, "num_peers", len(pool.peers))
@ -99,7 +100,7 @@ func (pool *blockPool) UpdatePeer(peerID p2p.ID, height int64) error {
} }
// Cleans and deletes the peer. Recomputes the max peer height. // Cleans and deletes the peer. Recomputes the max peer height.
func (pool *blockPool) deletePeer(peer *bpPeer) { func (pool *BlockPool) deletePeer(peer *BpPeer) {
if peer == nil { if peer == nil {
return return
} }
@ -112,7 +113,7 @@ func (pool *blockPool) deletePeer(peer *bpPeer) {
} }
// RemovePeer removes the blocks and requests from the peer, reschedules them and deletes the peer. // RemovePeer removes the blocks and requests from the peer, reschedules them and deletes the peer.
func (pool *blockPool) RemovePeer(peerID p2p.ID, err error) { func (pool *BlockPool) RemovePeer(peerID p2p.ID, err error) {
peer := pool.peers[peerID] peer := pool.peers[peerID]
if peer == nil { if peer == nil {
return return
@ -145,7 +146,7 @@ func (pool *blockPool) RemovePeer(peerID p2p.ID, err error) {
} }
} }
func (pool *blockPool) removeShortPeers() { func (pool *BlockPool) removeShortPeers() {
for _, peer := range pool.peers { for _, peer := range pool.peers {
if peer.Height < pool.Height { if peer.Height < pool.Height {
pool.RemovePeer(peer.ID, nil) pool.RemovePeer(peer.ID, nil)
@ -153,7 +154,7 @@ func (pool *blockPool) removeShortPeers() {
} }
} }
func (pool *blockPool) removeBadPeers() { func (pool *BlockPool) removeBadPeers() {
pool.removeShortPeers() pool.removeShortPeers()
for _, peer := range pool.peers { for _, peer := range pool.peers {
if err := peer.CheckRate(); err != nil { if err := peer.CheckRate(); err != nil {
@ -164,9 +165,10 @@ func (pool *blockPool) removeBadPeers() {
} }
// MakeNextRequests creates more requests if the block pool is running low. // MakeNextRequests creates more requests if the block pool is running low.
func (pool *blockPool) MakeNextRequests(maxNumRequests int) { func (pool *BlockPool) MakeNextRequests(maxNumRequests int) {
heights := pool.makeRequestBatch(maxNumRequests) heights := pool.makeRequestBatch(maxNumRequests)
pool.logger.Info("makeNextRequests will make following requests", "number", len(heights), "heights", heights) pool.logger.Info("makeNextRequests will make following requests",
"number", len(heights), "heights", heights)
for _, height := range heights { for _, height := range heights {
h := int64(height) h := int64(height)
@ -180,7 +182,7 @@ func (pool *blockPool) MakeNextRequests(maxNumRequests int) {
} }
// Makes a batch of requests sorted by height such that the block pool has up to maxNumRequests entries. // Makes a batch of requests sorted by height such that the block pool has up to maxNumRequests entries.
func (pool *blockPool) makeRequestBatch(maxNumRequests int) []int { func (pool *BlockPool) makeRequestBatch(maxNumRequests int) []int {
pool.removeBadPeers() pool.removeBadPeers()
// At this point pool.requests may include heights for requests to be redone due to removal of peers: // At this point pool.requests may include heights for requests to be redone due to removal of peers:
// - peers timed out or were removed by switch // - peers timed out or were removed by switch
@ -204,7 +206,7 @@ func (pool *blockPool) makeRequestBatch(maxNumRequests int) []int {
return heights return heights
} }
func (pool *blockPool) sendRequest(height int64) bool { func (pool *BlockPool) sendRequest(height int64) bool {
for _, peer := range pool.peers { for _, peer := range pool.peers {
if peer.NumPendingBlockRequests >= maxRequestsPerPeer { if peer.NumPendingBlockRequests >= maxRequestsPerPeer {
continue continue
@ -216,7 +218,8 @@ func (pool *blockPool) sendRequest(height int64) bool {
err := pool.toBcR.sendBlockRequest(peer.ID, height) err := pool.toBcR.sendBlockRequest(peer.ID, height)
if err == errNilPeerForBlockRequest { if err == errNilPeerForBlockRequest {
// Switch does not have this peer, remove it and continue to look for another peer. // Switch does not have this peer, remove it and continue to look for another peer.
pool.logger.Error("switch does not have peer..removing peer selected for height", "peer", peer.ID, "height", height) pool.logger.Error("switch does not have peer..removing peer selected for height", "peer",
peer.ID, "height", height)
pool.RemovePeer(peer.ID, err) pool.RemovePeer(peer.ID, err)
continue continue
} }
@ -237,8 +240,8 @@ func (pool *blockPool) sendRequest(height int64) bool {
return false return false
} }
// Validates that the block comes from the peer it was expected from and stores it in the 'blocks' map. // AddBlock validates that the block comes from the peer it was expected from and stores it in the 'blocks' map.
func (pool *blockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) error { func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) error {
peer, ok := pool.peers[peerID] peer, ok := pool.peers[peerID]
if !ok { if !ok {
pool.logger.Error("block from unknown peer", "height", block.Height, "peer", peerID) pool.logger.Error("block from unknown peer", "height", block.Height, "peer", peerID)
@ -253,14 +256,15 @@ func (pool *blockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
return peer.AddBlock(block, blockSize) return peer.AddBlock(block, blockSize)
} }
type blockData struct { // BlockData stores the peer responsible to deliver a block and the actual block if delivered.
type BlockData struct {
block *types.Block block *types.Block
peer *bpPeer peer *BpPeer
} }
// BlockAndPeerAtHeight retrieves the block and delivery peer at specified height. // BlockAndPeerAtHeight retrieves the block and delivery peer at specified height.
// Returns errMissingBlock if a block was not found // Returns errMissingBlock if a block was not found
func (pool *blockPool) BlockAndPeerAtHeight(height int64) (bData *blockData, err error) { func (pool *BlockPool) BlockAndPeerAtHeight(height int64) (bData *BlockData, err error) {
peerID := pool.blocks[height] peerID := pool.blocks[height]
peer := pool.peers[peerID] peer := pool.peers[peerID]
if peer == nil { if peer == nil {
@ -272,12 +276,12 @@ func (pool *blockPool) BlockAndPeerAtHeight(height int64) (bData *blockData, err
return nil, err return nil, err
} }
return &blockData{peer: peer, block: block}, nil return &BlockData{peer: peer, block: block}, nil
} }
// FirstTwoBlocksAndPeers returns the blocks and the delivery peers at pool's height H and H+1. // FirstTwoBlocksAndPeers returns the blocks and the delivery peers at pool's height H and H+1.
func (pool *blockPool) FirstTwoBlocksAndPeers() (first, second *blockData, err error) { func (pool *BlockPool) FirstTwoBlocksAndPeers() (first, second *BlockData, err error) {
first, err = pool.BlockAndPeerAtHeight(pool.Height) first, err = pool.BlockAndPeerAtHeight(pool.Height)
second, err2 := pool.BlockAndPeerAtHeight(pool.Height + 1) second, err2 := pool.BlockAndPeerAtHeight(pool.Height + 1)
if err == nil { if err == nil {
@ -287,7 +291,7 @@ func (pool *blockPool) FirstTwoBlocksAndPeers() (first, second *blockData, err e
} }
// InvalidateFirstTwoBlocks removes the peers that sent us the first two blocks, blocks are removed by RemovePeer(). // InvalidateFirstTwoBlocks removes the peers that sent us the first two blocks, blocks are removed by RemovePeer().
func (pool *blockPool) InvalidateFirstTwoBlocks(err error) { func (pool *BlockPool) InvalidateFirstTwoBlocks(err error) {
first, err1 := pool.BlockAndPeerAtHeight(pool.Height) first, err1 := pool.BlockAndPeerAtHeight(pool.Height)
second, err2 := pool.BlockAndPeerAtHeight(pool.Height + 1) second, err2 := pool.BlockAndPeerAtHeight(pool.Height + 1)
@ -301,7 +305,7 @@ func (pool *blockPool) InvalidateFirstTwoBlocks(err error) {
// ProcessedCurrentHeightBlock performs cleanup after a block is processed. It removes block at pool height and // ProcessedCurrentHeightBlock performs cleanup after a block is processed. It removes block at pool height and
// the peers that are now short. // the peers that are now short.
func (pool *blockPool) ProcessedCurrentHeightBlock() { func (pool *BlockPool) ProcessedCurrentHeightBlock() {
peerID, peerOk := pool.blocks[pool.Height] peerID, peerOk := pool.blocks[pool.Height]
if peerOk { if peerOk {
pool.peers[peerID].RemoveBlock(pool.Height) pool.peers[peerID].RemoveBlock(pool.Height)
@ -316,7 +320,7 @@ func (pool *blockPool) ProcessedCurrentHeightBlock() {
// delivery peer and returns. If a block at height H exists then the check and peer removal is done for H+1. // delivery peer and returns. If a block at height H exists then the check and peer removal is done for H+1.
// This function is called when the FSM is not able to make progress for some time. // This function is called when the FSM is not able to make progress for some time.
// This happens if either the block H or H+1 have not been delivered. // This happens if either the block H or H+1 have not been delivered.
func (pool *blockPool) RemovePeerAtCurrentHeights(err error) { func (pool *BlockPool) RemovePeerAtCurrentHeights(err error) {
peerID := pool.blocks[pool.Height] peerID := pool.blocks[pool.Height]
peer, ok := pool.peers[peerID] peer, ok := pool.peers[peerID]
if ok { if ok {
@ -340,17 +344,18 @@ func (pool *blockPool) RemovePeerAtCurrentHeights(err error) {
} }
// Cleanup performs pool and peer cleanup // Cleanup performs pool and peer cleanup
func (pool *blockPool) Cleanup() { func (pool *BlockPool) Cleanup() {
for _, peer := range pool.peers { for _, peer := range pool.peers {
peer.Cleanup() peer.Cleanup()
} }
} }
// NumPeers returns the number of peers in the pool // NumPeers returns the number of peers in the pool
func (pool *blockPool) NumPeers() int { func (pool *BlockPool) NumPeers() int {
return len(pool.peers) return len(pool.peers)
} }
func (pool *blockPool) NeedsBlocks() bool { // NeedsBlocks returns true if more blocks are required.
func (pool *BlockPool) NeedsBlocks() bool {
return len(pool.blocks) < maxNumRequests return len(pool.blocks) < maxNumRequests
} }

View File

@ -1,4 +1,4 @@
package blockchainexp package v1
import ( import (
"testing" "testing"
@ -59,7 +59,7 @@ type tPBlocks struct {
} }
// Makes a block pool with specified current height, list of peers, block requests and block responses // Makes a block pool with specified current height, list of peers, block requests and block responses
func makeBlockPool(bcr *testBcR, height int64, peers []bpPeer, blocks map[int64]tPBlocks) *blockPool { func makeBlockPool(bcr *testBcR, height int64, peers []BpPeer, blocks map[int64]tPBlocks) *BlockPool {
bPool := NewBlockPool(height, bcr) bPool := NewBlockPool(height, bcr)
bPool.SetLogger(bcr.logger) bPool.SetLogger(bcr.logger)
@ -70,7 +70,7 @@ func makeBlockPool(bcr *testBcR, height int64, peers []bpPeer, blocks map[int64]
if p.Height > maxH { if p.Height > maxH {
maxH = p.Height maxH = p.Height
} }
bPool.peers[p.ID] = NewBPPeer(p.ID, p.Height, bcr.sendPeerError, nil) bPool.peers[p.ID] = NewBpPeer(p.ID, p.Height, bcr.sendPeerError, nil)
bPool.peers[p.ID].SetLogger(bcr.logger) bPool.peers[p.ID].SetLogger(bcr.logger)
} }
@ -86,7 +86,7 @@ func makeBlockPool(bcr *testBcR, height int64, peers []bpPeer, blocks map[int64]
return bPool return bPool
} }
func assertPeerSetsEquivalent(t *testing.T, set1 map[p2p.ID]*bpPeer, set2 map[p2p.ID]*bpPeer) { func assertPeerSetsEquivalent(t *testing.T, set1 map[p2p.ID]*BpPeer, set2 map[p2p.ID]*BpPeer) {
assert.Equal(t, len(set1), len(set2)) assert.Equal(t, len(set1), len(set2))
for peerID, peer1 := range set1 { for peerID, peer1 := range set1 {
peer2 := set2[peerID] peer2 := set2[peerID]
@ -102,7 +102,7 @@ func assertPeerSetsEquivalent(t *testing.T, set1 map[p2p.ID]*bpPeer, set2 map[p2
} }
} }
func assertBlockPoolEquivalent(t *testing.T, poolWanted, pool *blockPool) { func assertBlockPoolEquivalent(t *testing.T, poolWanted, pool *BlockPool) {
assert.Equal(t, poolWanted.blocks, pool.blocks) assert.Equal(t, poolWanted.blocks, pool.blocks)
assertPeerSetsEquivalent(t, poolWanted.peers, pool.peers) assertPeerSetsEquivalent(t, poolWanted.peers, pool.peers)
assert.Equal(t, poolWanted.MaxPeerHeight, pool.MaxPeerHeight) assert.Equal(t, poolWanted.MaxPeerHeight, pool.MaxPeerHeight)
@ -115,45 +115,45 @@ func TestBlockPoolUpdatePeer(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *BlockPool
args testPeer args testPeer
poolWanted *blockPool poolWanted *BlockPool
errWanted error errWanted error
}{ }{
{ {
name: "add a first short peer", name: "add a first short peer",
pool: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
args: testPeer{"P1", 50}, args: testPeer{"P1", 50},
errWanted: errPeerTooShort, errWanted: errPeerTooShort,
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
}, },
{ {
name: "add a first good peer", name: "add a first good peer",
pool: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
args: testPeer{"P1", 101}, args: testPeer{"P1", 101},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 101}}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 101}}, map[int64]tPBlocks{}),
}, },
{ {
name: "increase the height of P1 from 120 to 123", name: "increase the height of P1 from 120 to 123",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}),
args: testPeer{"P1", 123}, args: testPeer{"P1", 123},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 123}}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 123}}, map[int64]tPBlocks{}),
}, },
{ {
name: "decrease the height of P1 from 120 to 110", name: "decrease the height of P1 from 120 to 110",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}),
args: testPeer{"P1", 110}, args: testPeer{"P1", 110},
errWanted: errPeerLowersItsHeight, errWanted: errPeerLowersItsHeight,
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
}, },
{ {
name: "decrease the height of P1 from 105 to 102 with blocks", name: "decrease the height of P1 from 105 to 102 with blocks",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 105}}, pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 105}},
map[int64]tPBlocks{ map[int64]tPBlocks{
100: {"P1", true}, 101: {"P1", true}, 102: {"P1", true}}), 100: {"P1", true}, 101: {"P1", true}, 102: {"P1", true}}),
args: testPeer{"P1", 102}, args: testPeer{"P1", 102},
errWanted: errPeerLowersItsHeight, errWanted: errPeerLowersItsHeight,
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, poolWanted: makeBlockPool(testBcR, 100, []BpPeer{},
map[int64]tPBlocks{}), map[int64]tPBlocks{}),
}, },
} }
@ -180,54 +180,54 @@ func TestBlockPoolRemovePeer(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *BlockPool
args args args args
poolWanted *blockPool poolWanted *BlockPool
}{ }{
{ {
name: "attempt to delete non-existing peer", name: "attempt to delete non-existing peer",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}),
args: args{"P99", nil}, args: args{"P99", nil},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}),
}, },
{ {
name: "delete the only peer without blocks", name: "delete the only peer without blocks",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}}, map[int64]tPBlocks{}),
args: args{"P1", nil}, args: args{"P1", nil},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
}, },
{ {
name: "delete the shortest of two peers without blocks", name: "delete the shortest of two peers without blocks",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 120}}, map[int64]tPBlocks{}),
args: args{"P1", nil}, args: args{"P1", nil},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P2", Height: 120}}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P2", Height: 120}}, map[int64]tPBlocks{}),
}, },
{ {
name: "delete the tallest of two peers without blocks", name: "delete the tallest of two peers without blocks",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 120}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 120}}, map[int64]tPBlocks{}),
args: args{"P2", nil}, args: args{"P2", nil},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}),
}, },
{ {
name: "delete the only peer with block requests sent and blocks received", name: "delete the only peer with block requests sent and blocks received",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 120}}, pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}},
map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}), map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}),
args: args{"P1", nil}, args: args{"P1", nil},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
}, },
{ {
name: "delete the shortest of two peers with block requests sent and blocks received", name: "delete the shortest of two peers with block requests sent and blocks received",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 200}}, pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 200}},
map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}), map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}),
args: args{"P1", nil}, args: args{"P1", nil},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P2", Height: 200}}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P2", Height: 200}}, map[int64]tPBlocks{}),
}, },
{ {
name: "delete the tallest of two peers with block requests sent and blocks received", name: "delete the tallest of two peers with block requests sent and blocks received",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 110}}, pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 110}},
map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}), map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}),
args: args{"P1", nil}, args: args{"P1", nil},
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P2", Height: 110}}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P2", Height: 110}}, map[int64]tPBlocks{}),
}, },
} }
@ -244,30 +244,30 @@ func TestBlockPoolRemoveShortPeers(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *BlockPool
poolWanted *blockPool poolWanted *BlockPool
}{ }{
{ {
name: "no short peers", name: "no short peers",
pool: makeBlockPool(testBcR, 100, pool: makeBlockPool(testBcR, 100,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 110}, {ID: "P3", Height: 120}}, map[int64]tPBlocks{}), []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 110}, {ID: "P3", Height: 120}}, map[int64]tPBlocks{}),
poolWanted: makeBlockPool(testBcR, 100, poolWanted: makeBlockPool(testBcR, 100,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 110}, {ID: "P3", Height: 120}}, map[int64]tPBlocks{}), []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 110}, {ID: "P3", Height: 120}}, map[int64]tPBlocks{}),
}, },
{ {
name: "one short peer", name: "one short peer",
pool: makeBlockPool(testBcR, 100, pool: makeBlockPool(testBcR, 100,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 90}, {ID: "P3", Height: 120}}, map[int64]tPBlocks{}), []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 90}, {ID: "P3", Height: 120}}, map[int64]tPBlocks{}),
poolWanted: makeBlockPool(testBcR, 100, poolWanted: makeBlockPool(testBcR, 100,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P3", Height: 120}}, map[int64]tPBlocks{}), []BpPeer{{ID: "P1", Height: 100}, {ID: "P3", Height: 120}}, map[int64]tPBlocks{}),
}, },
{ {
name: "all short peers", name: "all short peers",
pool: makeBlockPool(testBcR, 100, pool: makeBlockPool(testBcR, 100,
[]bpPeer{{ID: "P1", Height: 90}, {ID: "P2", Height: 91}, {ID: "P3", Height: 92}}, map[int64]tPBlocks{}), []BpPeer{{ID: "P1", Height: 90}, {ID: "P2", Height: 91}, {ID: "P3", Height: 92}}, map[int64]tPBlocks{}),
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
}, },
} }
@ -290,7 +290,7 @@ func TestBlockPoolSendRequestBatch(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *BlockPool
maxRequestsPerPeer int maxRequestsPerPeer int
expRequests map[int64]bool expRequests map[int64]bool
expPeerResults []testPeerResult expPeerResults []testPeerResult
@ -298,7 +298,7 @@ func TestBlockPoolSendRequestBatch(t *testing.T) {
}{ }{
{ {
name: "one peer - send up to maxRequestsPerPeer block requests", name: "one peer - send up to maxRequestsPerPeer block requests",
pool: makeBlockPool(testBcR, 10, []bpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 10, []BpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}),
maxRequestsPerPeer: 2, maxRequestsPerPeer: 2,
expRequests: map[int64]bool{10: true, 11: true}, expRequests: map[int64]bool{10: true, 11: true},
expPeerResults: []testPeerResult{{id: "P1", numPendingBlockRequests: 2}}, expPeerResults: []testPeerResult{{id: "P1", numPendingBlockRequests: 2}},
@ -306,7 +306,7 @@ func TestBlockPoolSendRequestBatch(t *testing.T) {
}, },
{ {
name: "n peers - send n*maxRequestsPerPeer block requests", name: "n peers - send n*maxRequestsPerPeer block requests",
pool: makeBlockPool(testBcR, 10, []bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 10, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, map[int64]tPBlocks{}),
maxRequestsPerPeer: 2, maxRequestsPerPeer: 2,
expRequests: map[int64]bool{10: true, 11: true}, expRequests: map[int64]bool{10: true, 11: true},
expPeerResults: []testPeerResult{ expPeerResults: []testPeerResult{
@ -347,24 +347,24 @@ func TestBlockPoolAddBlock(t *testing.T) {
} }
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *BlockPool
args args args args
poolWanted *blockPool poolWanted *BlockPool
errWanted error errWanted error
}{ }{
{name: "block from unknown peer", {name: "block from unknown peer",
pool: makeBlockPool(testBcR, 10, []bpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}), pool: makeBlockPool(testBcR, 10, []BpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}),
args: args{ args: args{
peerID: "P2", peerID: "P2",
block: types.MakeBlock(int64(10), txs, nil, nil), block: types.MakeBlock(int64(10), txs, nil, nil),
blockSize: 100, blockSize: 100,
}, },
poolWanted: makeBlockPool(testBcR, 10, []bpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 10, []BpPeer{{ID: "P1", Height: 100}}, map[int64]tPBlocks{}),
errWanted: errBadDataFromPeer, errWanted: errBadDataFromPeer,
}, },
{name: "unexpected block 11 from known peer - waiting for 10", {name: "unexpected block 11 from known peer - waiting for 10",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}},
map[int64]tPBlocks{10: {"P1", false}}), map[int64]tPBlocks{10: {"P1", false}}),
args: args{ args: args{
peerID: "P1", peerID: "P1",
@ -372,13 +372,13 @@ func TestBlockPoolAddBlock(t *testing.T) {
blockSize: 100, blockSize: 100,
}, },
poolWanted: makeBlockPool(testBcR, 10, poolWanted: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}},
map[int64]tPBlocks{10: {"P1", false}}), map[int64]tPBlocks{10: {"P1", false}}),
errWanted: errMissingRequest, errWanted: errMissingRequest,
}, },
{name: "unexpected block 10 from known peer - already have 10", {name: "unexpected block 10 from known peer - already have 10",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}},
map[int64]tPBlocks{10: {"P1", true}, 11: {"P1", false}}), map[int64]tPBlocks{10: {"P1", true}, 11: {"P1", false}}),
args: args{ args: args{
peerID: "P1", peerID: "P1",
@ -386,13 +386,13 @@ func TestBlockPoolAddBlock(t *testing.T) {
blockSize: 100, blockSize: 100,
}, },
poolWanted: makeBlockPool(testBcR, 10, poolWanted: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}},
map[int64]tPBlocks{10: {"P1", true}, 11: {"P1", false}}), map[int64]tPBlocks{10: {"P1", true}, 11: {"P1", false}}),
errWanted: errDuplicateBlock, errWanted: errDuplicateBlock,
}, },
{name: "unexpected block 10 from known peer P2 - expected 10 to come from P1", {name: "unexpected block 10 from known peer P2 - expected 10 to come from P1",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}},
map[int64]tPBlocks{10: {"P1", false}}), map[int64]tPBlocks{10: {"P1", false}}),
args: args{ args: args{
peerID: "P2", peerID: "P2",
@ -400,13 +400,13 @@ func TestBlockPoolAddBlock(t *testing.T) {
blockSize: 100, blockSize: 100,
}, },
poolWanted: makeBlockPool(testBcR, 10, poolWanted: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}},
map[int64]tPBlocks{10: {"P1", false}}), map[int64]tPBlocks{10: {"P1", false}}),
errWanted: errBadDataFromPeer, errWanted: errBadDataFromPeer,
}, },
{name: "expected block from known peer", {name: "expected block from known peer",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}},
map[int64]tPBlocks{10: {"P1", false}}), map[int64]tPBlocks{10: {"P1", false}}),
args: args{ args: args{
peerID: "P1", peerID: "P1",
@ -414,7 +414,7 @@ func TestBlockPoolAddBlock(t *testing.T) {
blockSize: 100, blockSize: 100,
}, },
poolWanted: makeBlockPool(testBcR, 10, poolWanted: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}},
map[int64]tPBlocks{10: {"P1", true}}), map[int64]tPBlocks{10: {"P1", true}}),
errWanted: nil, errWanted: nil,
}, },
@ -434,7 +434,7 @@ func TestBlockPoolFirstTwoBlocksAndPeers(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *BlockPool
firstWanted int64 firstWanted int64
secondWanted int64 secondWanted int64
errWanted error errWanted error
@ -442,14 +442,14 @@ func TestBlockPoolFirstTwoBlocksAndPeers(t *testing.T) {
{ {
name: "both blocks missing", name: "both blocks missing",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}},
map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}), map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}),
errWanted: errMissingBlock, errWanted: errMissingBlock,
}, },
{ {
name: "second block missing", name: "second block missing",
pool: makeBlockPool(testBcR, 15, pool: makeBlockPool(testBcR, 15,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}},
map[int64]tPBlocks{15: {"P1", true}, 18: {"P2", true}}), map[int64]tPBlocks{15: {"P1", true}, 18: {"P2", true}}),
firstWanted: 15, firstWanted: 15,
errWanted: errMissingBlock, errWanted: errMissingBlock,
@ -457,7 +457,7 @@ func TestBlockPoolFirstTwoBlocksAndPeers(t *testing.T) {
{ {
name: "first block missing", name: "first block missing",
pool: makeBlockPool(testBcR, 15, pool: makeBlockPool(testBcR, 15,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}},
map[int64]tPBlocks{16: {"P2", true}, 18: {"P2", true}}), map[int64]tPBlocks{16: {"P2", true}, 18: {"P2", true}}),
secondWanted: 16, secondWanted: 16,
errWanted: errMissingBlock, errWanted: errMissingBlock,
@ -465,7 +465,7 @@ func TestBlockPoolFirstTwoBlocksAndPeers(t *testing.T) {
{ {
name: "both blocks present", name: "both blocks present",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}},
map[int64]tPBlocks{10: {"P1", true}, 11: {"P2", true}}), map[int64]tPBlocks{10: {"P1", true}, 11: {"P2", true}}),
firstWanted: 10, firstWanted: 10,
secondWanted: 11, secondWanted: 11,
@ -482,7 +482,7 @@ func TestBlockPoolFirstTwoBlocksAndPeers(t *testing.T) {
peer := pool.blocks[tt.firstWanted] peer := pool.blocks[tt.firstWanted]
block := pool.peers[peer].blocks[tt.firstWanted] block := pool.peers[peer].blocks[tt.firstWanted]
assert.Equal(t, block, gotFirst.block, assert.Equal(t, block, gotFirst.block,
"blockPool.FirstTwoBlocksAndPeers() gotFirst = %v, want %v", "BlockPool.FirstTwoBlocksAndPeers() gotFirst = %v, want %v",
tt.firstWanted, gotFirst.block.Height) tt.firstWanted, gotFirst.block.Height)
} }
@ -490,7 +490,7 @@ func TestBlockPoolFirstTwoBlocksAndPeers(t *testing.T) {
peer := pool.blocks[tt.secondWanted] peer := pool.blocks[tt.secondWanted]
block := pool.peers[peer].blocks[tt.secondWanted] block := pool.peers[peer].blocks[tt.secondWanted]
assert.Equal(t, block, gotSecond.block, assert.Equal(t, block, gotSecond.block,
"blockPool.FirstTwoBlocksAndPeers() gotFirst = %v, want %v", "BlockPool.FirstTwoBlocksAndPeers() gotFirst = %v, want %v",
tt.secondWanted, gotSecond.block.Height) tt.secondWanted, gotSecond.block.Height)
} }
}) })
@ -502,43 +502,43 @@ func TestBlockPoolInvalidateFirstTwoBlocks(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *BlockPool
poolWanted *blockPool poolWanted *BlockPool
}{ }{
{ {
name: "both blocks missing", name: "both blocks missing",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}},
map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}), map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}),
poolWanted: makeBlockPool(testBcR, 10, poolWanted: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}},
map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}), map[int64]tPBlocks{15: {"P1", true}, 16: {"P2", true}}),
}, },
{ {
name: "second block missing", name: "second block missing",
pool: makeBlockPool(testBcR, 15, pool: makeBlockPool(testBcR, 15,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}},
map[int64]tPBlocks{15: {"P1", true}, 18: {"P2", true}}), map[int64]tPBlocks{15: {"P1", true}, 18: {"P2", true}}),
poolWanted: makeBlockPool(testBcR, 15, poolWanted: makeBlockPool(testBcR, 15,
[]bpPeer{{ID: "P2", Height: 100}}, []BpPeer{{ID: "P2", Height: 100}},
map[int64]tPBlocks{18: {"P2", true}}), map[int64]tPBlocks{18: {"P2", true}}),
}, },
{ {
name: "first block missing", name: "first block missing",
pool: makeBlockPool(testBcR, 15, pool: makeBlockPool(testBcR, 15,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}},
map[int64]tPBlocks{18: {"P1", true}, 16: {"P2", true}}), map[int64]tPBlocks{18: {"P1", true}, 16: {"P2", true}}),
poolWanted: makeBlockPool(testBcR, 15, poolWanted: makeBlockPool(testBcR, 15,
[]bpPeer{{ID: "P1", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}},
map[int64]tPBlocks{18: {"P1", true}}), map[int64]tPBlocks{18: {"P1", true}}),
}, },
{ {
name: "both blocks present", name: "both blocks present",
pool: makeBlockPool(testBcR, 10, pool: makeBlockPool(testBcR, 10,
[]bpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}}, []BpPeer{{ID: "P1", Height: 100}, {ID: "P2", Height: 100}},
map[int64]tPBlocks{10: {"P1", true}, 11: {"P2", true}}), map[int64]tPBlocks{10: {"P1", true}, 11: {"P2", true}}),
poolWanted: makeBlockPool(testBcR, 10, poolWanted: makeBlockPool(testBcR, 10,
[]bpPeer{}, []BpPeer{},
map[int64]tPBlocks{}), map[int64]tPBlocks{}),
}, },
} }
@ -556,26 +556,26 @@ func TestProcessedCurrentHeightBlock(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *BlockPool
poolWanted *blockPool poolWanted *BlockPool
}{ }{
{ {
name: "one peer", name: "one peer",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 120}}, pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}},
map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", true}}), map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", true}}),
poolWanted: makeBlockPool(testBcR, 101, []bpPeer{{ID: "P1", Height: 120}}, poolWanted: makeBlockPool(testBcR, 101, []BpPeer{{ID: "P1", Height: 120}},
map[int64]tPBlocks{101: {"P1", true}}), map[int64]tPBlocks{101: {"P1", true}}),
}, },
{ {
name: "multiple peers", name: "multiple peers",
pool: makeBlockPool(testBcR, 100, pool: makeBlockPool(testBcR, 100,
[]bpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 120}, {ID: "P3", Height: 130}}, []BpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 120}, {ID: "P3", Height: 130}},
map[int64]tPBlocks{ map[int64]tPBlocks{
100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false}, 100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false},
101: {"P2", true}, 103: {"P2", false}, 101: {"P2", true}, 103: {"P2", false},
102: {"P3", true}, 106: {"P3", true}}), 102: {"P3", true}, 106: {"P3", true}}),
poolWanted: makeBlockPool(testBcR, 101, poolWanted: makeBlockPool(testBcR, 101,
[]bpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 120}, {ID: "P3", Height: 130}}, []BpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 120}, {ID: "P3", Height: 130}},
map[int64]tPBlocks{ map[int64]tPBlocks{
104: {"P1", true}, 105: {"P1", false}, 104: {"P1", true}, 105: {"P1", false},
101: {"P2", true}, 103: {"P2", false}, 101: {"P2", true}, 103: {"P2", false},
@ -596,31 +596,31 @@ func TestRemovePeerAtCurrentHeight(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
pool *blockPool pool *BlockPool
poolWanted *blockPool poolWanted *BlockPool
}{ }{
{ {
name: "one peer, remove peer for block at H", name: "one peer, remove peer for block at H",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 120}}, pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}},
map[int64]tPBlocks{100: {"P1", false}, 101: {"P1", true}}), map[int64]tPBlocks{100: {"P1", false}, 101: {"P1", true}}),
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
}, },
{ {
name: "one peer, remove peer for block at H+1", name: "one peer, remove peer for block at H+1",
pool: makeBlockPool(testBcR, 100, []bpPeer{{ID: "P1", Height: 120}}, pool: makeBlockPool(testBcR, 100, []BpPeer{{ID: "P1", Height: 120}},
map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}), map[int64]tPBlocks{100: {"P1", true}, 101: {"P1", false}}),
poolWanted: makeBlockPool(testBcR, 100, []bpPeer{}, map[int64]tPBlocks{}), poolWanted: makeBlockPool(testBcR, 100, []BpPeer{}, map[int64]tPBlocks{}),
}, },
{ {
name: "multiple peers, remove peer for block at H", name: "multiple peers, remove peer for block at H",
pool: makeBlockPool(testBcR, 100, pool: makeBlockPool(testBcR, 100,
[]bpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 120}, {ID: "P3", Height: 130}}, []BpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 120}, {ID: "P3", Height: 130}},
map[int64]tPBlocks{ map[int64]tPBlocks{
100: {"P1", false}, 104: {"P1", true}, 105: {"P1", false}, 100: {"P1", false}, 104: {"P1", true}, 105: {"P1", false},
101: {"P2", true}, 103: {"P2", false}, 101: {"P2", true}, 103: {"P2", false},
102: {"P3", true}, 106: {"P3", true}}), 102: {"P3", true}, 106: {"P3", true}}),
poolWanted: makeBlockPool(testBcR, 100, poolWanted: makeBlockPool(testBcR, 100,
[]bpPeer{{ID: "P2", Height: 120}, {ID: "P3", Height: 130}}, []BpPeer{{ID: "P2", Height: 120}, {ID: "P3", Height: 130}},
map[int64]tPBlocks{ map[int64]tPBlocks{
101: {"P2", true}, 103: {"P2", false}, 101: {"P2", true}, 103: {"P2", false},
102: {"P3", true}, 106: {"P3", true}}), 102: {"P3", true}, 106: {"P3", true}}),
@ -628,13 +628,13 @@ func TestRemovePeerAtCurrentHeight(t *testing.T) {
{ {
name: "multiple peers, remove peer for block at H+1", name: "multiple peers, remove peer for block at H+1",
pool: makeBlockPool(testBcR, 100, pool: makeBlockPool(testBcR, 100,
[]bpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 120}, {ID: "P3", Height: 130}}, []BpPeer{{ID: "P1", Height: 120}, {ID: "P2", Height: 120}, {ID: "P3", Height: 130}},
map[int64]tPBlocks{ map[int64]tPBlocks{
100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false}, 100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false},
101: {"P2", false}, 103: {"P2", false}, 101: {"P2", false}, 103: {"P2", false},
102: {"P3", true}, 106: {"P3", true}}), 102: {"P3", true}, 106: {"P3", true}}),
poolWanted: makeBlockPool(testBcR, 100, poolWanted: makeBlockPool(testBcR, 100,
[]bpPeer{{ID: "P1", Height: 120}, {ID: "P3", Height: 130}}, []BpPeer{{ID: "P1", Height: 120}, {ID: "P3", Height: 130}},
map[int64]tPBlocks{ map[int64]tPBlocks{
100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false}, 100: {"P1", true}, 104: {"P1", true}, 105: {"P1", false},
102: {"P3", true}, 106: {"P3", true}}), 102: {"P3", true}, 106: {"P3", true}}),

View File

@ -1,4 +1,4 @@
package blockchainexp package v1
import ( import (
"errors" "errors"
@ -58,7 +58,7 @@ type BlockchainReactor struct {
fastSync bool fastSync bool
fsm *bReactorFSM fsm *BcReactorFSM
blocksSynced int blocksSynced int
// Receive goroutine forwards messages to this channel to be processed in the context of the poolRoutine. // Receive goroutine forwards messages to this channel to be processed in the context of the poolRoutine.
@ -210,7 +210,8 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes) msg, err := decodeMsg(msgBytes)
if err != nil { if err != nil {
bcR.Logger.Error("error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) bcR.Logger.Error("error decoding message",
"src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
bcR.Switch.StopPeerForError(src, err) bcR.Switch.StopPeerForError(src, err)
return return
} }

View File

@ -1,4 +1,4 @@
package blockchainexp package v1
import ( import (
"errors" "errors"
@ -12,41 +12,41 @@ import (
) )
// Blockchain Reactor State // Blockchain Reactor State
type bReactorFSMState struct { type bcReactorFSMState struct {
name string name string
// called when transitioning out of current state // called when transitioning out of current state
handle func(*bReactorFSM, bReactorEvent, bReactorEventData) (next *bReactorFSMState, err error) handle func(*BcReactorFSM, bReactorEvent, bReactorEventData) (next *bcReactorFSMState, err error)
// called when entering the state // called when entering the state
enter func(fsm *bReactorFSM) enter func(fsm *BcReactorFSM)
// timeout to ensure FSM is not stuck in a state forever // timeout to ensure FSM is not stuck in a state forever
// the timer is owned and run by the fsm instance // the timer is owned and run by the fsm instance
timeout time.Duration timeout time.Duration
} }
func (s *bReactorFSMState) String() string { func (s *bcReactorFSMState) String() string {
return s.name return s.name
} }
// Blockchain Reactor State Machine // BcReactorFSM is the datastructure for the Blockchain Reactor State Machine
type bReactorFSM struct { type BcReactorFSM struct {
logger log.Logger logger log.Logger
mtx sync.Mutex mtx sync.Mutex
startTime time.Time startTime time.Time
state *bReactorFSMState state *bcReactorFSMState
stateTimer *time.Timer stateTimer *time.Timer
pool *blockPool pool *BlockPool
// interface used to call the Blockchain reactor to send StatusRequest, BlockRequest, reporting errors, etc. // interface used to call the Blockchain reactor to send StatusRequest, BlockRequest, reporting errors, etc.
toBcR bcReactor toBcR bcReactor
} }
// NewFSM creates a new reactor FSM. // NewFSM creates a new reactor FSM.
func NewFSM(height int64, toBcR bcReactor) *bReactorFSM { func NewFSM(height int64, toBcR bcReactor) *BcReactorFSM {
return &bReactorFSM{ return &BcReactorFSM{
state: unknown, state: unknown,
startTime: time.Now(), startTime: time.Now(),
pool: NewBlockPool(height, toBcR), pool: NewBlockPool(height, toBcR),
@ -136,10 +136,10 @@ func (ev bReactorEvent) String() string {
// states // states
var ( var (
unknown *bReactorFSMState unknown *bcReactorFSMState
waitForPeer *bReactorFSMState waitForPeer *bcReactorFSMState
waitForBlock *bReactorFSMState waitForBlock *bcReactorFSMState
finished *bReactorFSMState finished *bcReactorFSMState
) )
// timeouts for state timers // timeouts for state timers
@ -170,9 +170,9 @@ var (
) )
func init() { func init() {
unknown = &bReactorFSMState{ unknown = &bcReactorFSMState{
name: "unknown", name: "unknown",
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { handle: func(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) (*bcReactorFSMState, error) {
switch ev { switch ev {
case startFSMEv: case startFSMEv:
// Broadcast Status message. Currently doesn't return non-nil error. // Broadcast Status message. Currently doesn't return non-nil error.
@ -188,14 +188,14 @@ func init() {
}, },
} }
waitForPeer = &bReactorFSMState{ waitForPeer = &bcReactorFSMState{
name: "waitForPeer", name: "waitForPeer",
timeout: waitForPeerTimeout, timeout: waitForPeerTimeout,
enter: func(fsm *bReactorFSM) { enter: func(fsm *BcReactorFSM) {
// Stop when leaving the state. // Stop when leaving the state.
fsm.resetStateTimer() fsm.resetStateTimer()
}, },
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { handle: func(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) (*bcReactorFSMState, error) {
switch ev { switch ev {
case stateTimeoutEv: case stateTimeoutEv:
if data.stateName != "waitForPeer" { if data.stateName != "waitForPeer" {
@ -230,14 +230,14 @@ func init() {
}, },
} }
waitForBlock = &bReactorFSMState{ waitForBlock = &bcReactorFSMState{
name: "waitForBlock", name: "waitForBlock",
timeout: waitForBlockAtCurrentHeightTimeout, timeout: waitForBlockAtCurrentHeightTimeout,
enter: func(fsm *bReactorFSM) { enter: func(fsm *BcReactorFSM) {
// Stop when leaving the state. // Stop when leaving the state.
fsm.resetStateTimer() fsm.resetStateTimer()
}, },
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { handle: func(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) (*bcReactorFSMState, error) {
switch ev { switch ev {
case statusResponseEv: case statusResponseEv:
@ -246,7 +246,7 @@ func init() {
return waitForPeer, err return waitForPeer, err
} }
if fsm.pool.ReachedMaxHeight() { if fsm.pool.ReachedMaxHeight() {
return finished, nil return finished, err
} }
return waitForBlock, err return waitForBlock, err
@ -259,6 +259,9 @@ func init() {
fsm.pool.RemovePeer(data.peerID, err) fsm.pool.RemovePeer(data.peerID, err)
fsm.toBcR.sendPeerError(err, data.peerID) fsm.toBcR.sendPeerError(err, data.peerID)
} }
if fsm.pool.NumPeers() == 0 {
return waitForPeer, err
}
return waitForBlock, err return waitForBlock, err
case processedBlockEv: case processedBlockEv:
@ -329,14 +332,14 @@ func init() {
}, },
} }
finished = &bReactorFSMState{ finished = &bcReactorFSMState{
name: "finished", name: "finished",
enter: func(fsm *bReactorFSM) { enter: func(fsm *BcReactorFSM) {
fsm.logger.Info("Time to switch to consensus reactor!", "height", fsm.pool.Height) fsm.logger.Info("Time to switch to consensus reactor!", "height", fsm.pool.Height)
fsm.toBcR.switchToConsensus() fsm.toBcR.switchToConsensus()
fsm.cleanup() fsm.cleanup()
}, },
handle: func(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) (*bReactorFSMState, error) { handle: func(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) (*bcReactorFSMState, error) {
return nil, nil return nil, nil
}, },
} }
@ -353,18 +356,19 @@ type bcReactor interface {
switchToConsensus() switchToConsensus()
} }
func (fsm *bReactorFSM) SetLogger(l log.Logger) { // SetLogger sets the FSM logger.
func (fsm *BcReactorFSM) SetLogger(l log.Logger) {
fsm.logger = l fsm.logger = l
fsm.pool.SetLogger(l) fsm.pool.SetLogger(l)
} }
// Starts the FSM. // Start starts the FSM.
func (fsm *bReactorFSM) Start() { func (fsm *BcReactorFSM) Start() {
_ = fsm.Handle(&bcReactorMessage{event: startFSMEv}) _ = fsm.Handle(&bcReactorMessage{event: startFSMEv})
} }
// handle processes messages and events sent to the FSM. // Handle processes messages and events sent to the FSM.
func (fsm *bReactorFSM) Handle(msg *bcReactorMessage) error { func (fsm *BcReactorFSM) Handle(msg *bcReactorMessage) error {
fsm.mtx.Lock() fsm.mtx.Lock()
defer fsm.mtx.Unlock() defer fsm.mtx.Unlock()
fsm.logger.Debug("FSM received", "event", msg, "state", fsm.state) fsm.logger.Debug("FSM received", "event", msg, "state", fsm.state)
@ -386,7 +390,7 @@ func (fsm *bReactorFSM) Handle(msg *bcReactorMessage) error {
return err return err
} }
func (fsm *bReactorFSM) transition(next *bReactorFSMState) { func (fsm *BcReactorFSM) transition(next *bcReactorFSMState) {
if next == nil { if next == nil {
return return
} }
@ -400,30 +404,31 @@ func (fsm *bReactorFSM) transition(next *bReactorFSMState) {
// Called when entering an FSM state in order to detect lack of progress in the state machine. // Called when entering an FSM state in order to detect lack of progress in the state machine.
// Note the use of the 'bcr' interface to facilitate testing without timer expiring. // Note the use of the 'bcr' interface to facilitate testing without timer expiring.
func (fsm *bReactorFSM) resetStateTimer() { func (fsm *BcReactorFSM) resetStateTimer() {
fsm.toBcR.resetStateTimer(fsm.state.name, &fsm.stateTimer, fsm.state.timeout) fsm.toBcR.resetStateTimer(fsm.state.name, &fsm.stateTimer, fsm.state.timeout)
} }
func (fsm *bReactorFSM) isCaughtUp() bool { func (fsm *BcReactorFSM) isCaughtUp() bool {
return fsm.state == finished return fsm.state == finished
} }
func (fsm *bReactorFSM) makeNextRequests(maxNumRequests int) { func (fsm *BcReactorFSM) makeNextRequests(maxNumRequests int) {
fsm.pool.MakeNextRequests(maxNumRequests) fsm.pool.MakeNextRequests(maxNumRequests)
} }
func (fsm *bReactorFSM) cleanup() { func (fsm *BcReactorFSM) cleanup() {
fsm.pool.Cleanup() fsm.pool.Cleanup()
} }
func (fsm *bReactorFSM) NeedsBlocks() bool { // NeedsBlocks checks if more block requests are required.
func (fsm *BcReactorFSM) NeedsBlocks() bool {
fsm.mtx.Lock() fsm.mtx.Lock()
defer fsm.mtx.Unlock() defer fsm.mtx.Unlock()
return fsm.state.name == "waitForBlock" && fsm.pool.NeedsBlocks() return fsm.state.name == "waitForBlock" && fsm.pool.NeedsBlocks()
} }
// NextTwoBlocks returns the two blocks at pool height and height+1 // FirstTwoBlocks returns the two blocks at pool height and height+1
func (fsm *bReactorFSM) FirstTwoBlocks() (first, second *types.Block, err error) { func (fsm *BcReactorFSM) FirstTwoBlocks() (first, second *types.Block, err error) {
fsm.mtx.Lock() fsm.mtx.Lock()
defer fsm.mtx.Unlock() defer fsm.mtx.Unlock()
firstBP, secondBP, err := fsm.pool.FirstTwoBlocksAndPeers() firstBP, secondBP, err := fsm.pool.FirstTwoBlocksAndPeers()
@ -435,7 +440,7 @@ func (fsm *bReactorFSM) FirstTwoBlocks() (first, second *types.Block, err error)
} }
// Status returns the pool's height and the maximum peer height. // Status returns the pool's height and the maximum peer height.
func (fsm *bReactorFSM) Status() (height, maxPeerHeight int64) { func (fsm *BcReactorFSM) Status() (height, maxPeerHeight int64) {
fsm.mtx.Lock() fsm.mtx.Lock()
defer fsm.mtx.Unlock() defer fsm.mtx.Unlock()
return fsm.pool.Height, fsm.pool.MaxPeerHeight return fsm.pool.Height, fsm.pool.MaxPeerHeight

View File

@ -1,4 +1,4 @@
package blockchainexp package v1
import ( import (
"fmt" "fmt"
@ -25,7 +25,7 @@ type lastPeerErrorT struct {
// reactor for FSM testing // reactor for FSM testing
type testReactor struct { type testReactor struct {
logger log.Logger logger log.Logger
fsm *bReactorFSM fsm *BcReactorFSM
numStatusRequests int numStatusRequests int
numBlockRequests int numBlockRequests int
lastBlockRequest lastBlockRequestT lastBlockRequest lastBlockRequestT
@ -33,106 +33,106 @@ type testReactor struct {
stateTimerStarts map[string]int stateTimerStarts map[string]int
} }
func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error { func sendEventToFSM(fsm *BcReactorFSM, ev bReactorEvent, data bReactorEventData) error {
return fsm.Handle(&bcReactorMessage{event: ev, data: data}) return fsm.Handle(&bcReactorMessage{event: ev, data: data})
} }
type fsmStepTestValues struct { type fsmStepTestValues struct {
currentState string currentState string
event bReactorEvent event bReactorEvent
data bReactorEventData data bReactorEventData
errWanted error
expectedState string
shouldSendStatusReq bool wantErr error
blockReqIncreased bool wantState string
blocksAdded []int64 wantStatusReqSent bool
peersNotInPool []p2p.ID wantReqIncreased bool
wantNewBlocks []int64
wantRemovedPeers []p2p.ID
} }
// -------------------------------------------- // ---------------------------------------------------------------------------
// helper function to make tests easier to read // helper test function for different FSM events, state and expected behavior
func makeStepStopFSMEv(current, expected string) fsmStepTestValues { func sStopFSMEv(current, expected string) fsmStepTestValues {
return fsmStepTestValues{ return fsmStepTestValues{
currentState: current, currentState: current,
expectedState: expected, event: stopFSMEv,
event: stopFSMEv, wantState: expected,
errWanted: errNoErrorFinished} wantErr: errNoErrorFinished}
} }
func makeStepUnknownFSMEv(current string) fsmStepTestValues { func sUnknownFSMEv(current string) fsmStepTestValues {
return fsmStepTestValues{ return fsmStepTestValues{
currentState: current, currentState: current,
event: 1234, event: 1234,
expectedState: current, wantState: current,
errWanted: errInvalidEvent} wantErr: errInvalidEvent}
} }
func makeStepStartFSMEv() fsmStepTestValues { func sStartFSMEv() fsmStepTestValues {
return fsmStepTestValues{ return fsmStepTestValues{
currentState: "unknown", currentState: "unknown",
event: startFSMEv, event: startFSMEv,
expectedState: "waitForPeer", wantState: "waitForPeer",
shouldSendStatusReq: true} wantStatusReqSent: true}
} }
func makeStepStateTimeoutEv(current, expected string, timedoutState string, errWanted error) fsmStepTestValues { func sStateTimeoutEv(current, expected string, timedoutState string, wantErr error) fsmStepTestValues {
return fsmStepTestValues{ return fsmStepTestValues{
currentState: current, currentState: current,
event: stateTimeoutEv, event: stateTimeoutEv,
data: bReactorEventData{ data: bReactorEventData{
stateName: timedoutState, stateName: timedoutState,
}, },
expectedState: expected, wantState: expected,
errWanted: errWanted, wantErr: wantErr,
} }
} }
func makeStepProcessedBlockEv(current, expected string, reactorError error) fsmStepTestValues { func sProcessedBlockEv(current, expected string, reactorError error) fsmStepTestValues {
return fsmStepTestValues{ return fsmStepTestValues{
currentState: current, currentState: current,
event: processedBlockEv, event: processedBlockEv,
expectedState: expected,
data: bReactorEventData{ data: bReactorEventData{
err: reactorError, err: reactorError,
}, },
errWanted: reactorError, wantState: expected,
wantErr: reactorError,
} }
} }
func makeStepStatusEv(current, expected string, peerID p2p.ID, height int64, err error) fsmStepTestValues { func sStatusEv(current, expected string, peerID p2p.ID, height int64, err error) fsmStepTestValues {
return fsmStepTestValues{ return fsmStepTestValues{
currentState: current, currentState: current,
event: statusResponseEv, event: statusResponseEv,
data: bReactorEventData{peerID: peerID, height: height}, data: bReactorEventData{peerID: peerID, height: height},
expectedState: expected, wantState: expected,
errWanted: err} wantErr: err}
} }
func makeStepMakeRequestsEv(current, expected string, maxPendingRequests int) fsmStepTestValues { func sMakeRequestsEv(current, expected string, maxPendingRequests int) fsmStepTestValues {
return fsmStepTestValues{ return fsmStepTestValues{
currentState: current, currentState: current,
event: makeRequestsEv, event: makeRequestsEv,
data: bReactorEventData{maxNumRequests: maxPendingRequests}, data: bReactorEventData{maxNumRequests: maxPendingRequests},
expectedState: expected, wantState: expected,
blockReqIncreased: true, wantReqIncreased: true,
} }
} }
func makeStepMakeRequestsEvErrored(current, expected string, func sMakeRequestsEvErrored(current, expected string,
maxPendingRequests int, err error, peersRemoved []p2p.ID) fsmStepTestValues { maxPendingRequests int, err error, peersRemoved []p2p.ID) fsmStepTestValues {
return fsmStepTestValues{ return fsmStepTestValues{
currentState: current, currentState: current,
event: makeRequestsEv, event: makeRequestsEv,
data: bReactorEventData{maxNumRequests: maxPendingRequests}, data: bReactorEventData{maxNumRequests: maxPendingRequests},
expectedState: expected, wantState: expected,
errWanted: err, wantErr: err,
peersNotInPool: peersRemoved, wantRemovedPeers: peersRemoved,
blockReqIncreased: true, wantReqIncreased: true,
} }
} }
func makeStepBlockRespEv(current, expected string, peerID p2p.ID, height int64, prevBlocks []int64) fsmStepTestValues { func sBlockRespEv(current, expected string, peerID p2p.ID, height int64, prevBlocks []int64) fsmStepTestValues {
txs := []types.Tx{types.Tx("foo"), types.Tx("bar")} txs := []types.Tx{types.Tx("foo"), types.Tx("bar")}
return fsmStepTestValues{ return fsmStepTestValues{
currentState: current, currentState: current,
@ -142,13 +142,13 @@ func makeStepBlockRespEv(current, expected string, peerID p2p.ID, height int64,
height: height, height: height,
block: types.MakeBlock(int64(height), txs, nil, nil), block: types.MakeBlock(int64(height), txs, nil, nil),
length: 100}, length: 100},
expectedState: expected, wantState: expected,
blocksAdded: append(prevBlocks, height), wantNewBlocks: append(prevBlocks, height),
} }
} }
func makeStepBlockRespEvErrored(current, expected string, func sBlockRespEvErrored(current, expected string,
peerID p2p.ID, height int64, prevBlocks []int64, errWanted error, peersRemoved []p2p.ID) fsmStepTestValues { peerID p2p.ID, height int64, prevBlocks []int64, wantErr error, peersRemoved []p2p.ID) fsmStepTestValues {
txs := []types.Tx{types.Tx("foo"), types.Tx("bar")} txs := []types.Tx{types.Tx("foo"), types.Tx("bar")}
return fsmStepTestValues{ return fsmStepTestValues{
@ -159,14 +159,14 @@ func makeStepBlockRespEvErrored(current, expected string,
height: height, height: height,
block: types.MakeBlock(int64(height), txs, nil, nil), block: types.MakeBlock(int64(height), txs, nil, nil),
length: 100}, length: 100},
expectedState: expected, wantState: expected,
errWanted: errWanted, wantErr: wantErr,
peersNotInPool: peersRemoved, wantRemovedPeers: peersRemoved,
blocksAdded: prevBlocks, wantNewBlocks: prevBlocks,
} }
} }
func makeStepPeerRemoveEv(current, expected string, peerID p2p.ID, err error, peersRemoved []p2p.ID) fsmStepTestValues { func sPeerRemoveEv(current, expected string, peerID p2p.ID, err error, peersRemoved []p2p.ID) fsmStepTestValues {
return fsmStepTestValues{ return fsmStepTestValues{
currentState: current, currentState: current,
event: peerRemoveEv, event: peerRemoveEv,
@ -174,8 +174,8 @@ func makeStepPeerRemoveEv(current, expected string, peerID p2p.ID, err error, pe
peerID: peerID, peerID: peerID,
err: err, err: err,
}, },
expectedState: expected, wantState: expected,
peersNotInPool: peersRemoved, wantRemovedPeers: peersRemoved,
} }
} }
@ -189,7 +189,7 @@ func newTestReactor(height int64) *testReactor {
} }
func fixBlockResponseEvStep(step *fsmStepTestValues, testBcR *testReactor) { func fixBlockResponseEvStep(step *fsmStepTestValues, testBcR *testReactor) {
// There is no good way to know to which peer a block request was sent. // There is currently no good way to know to which peer a block request was sent.
// So in some cases where it does not matter, before we simulate a block response // So in some cases where it does not matter, before we simulate a block response
// we cheat and look where it is expected from. // we cheat and look where it is expected from.
if step.event == blockResponseEv { if step.event == blockResponseEv {
@ -233,21 +233,21 @@ func executeFSMTests(t *testing.T, tests []testFields, matchRespToReq bool) {
} }
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr) assert.Equal(t, step.wantErr, fsmErr)
if step.shouldSendStatusReq { if step.wantStatusReqSent {
assert.Equal(t, oldNumStatusRequests+1, testBcR.numStatusRequests) assert.Equal(t, oldNumStatusRequests+1, testBcR.numStatusRequests)
} else { } else {
assert.Equal(t, oldNumStatusRequests, testBcR.numStatusRequests) assert.Equal(t, oldNumStatusRequests, testBcR.numStatusRequests)
} }
if step.blockReqIncreased { if step.wantReqIncreased {
assert.True(t, oldNumBlockRequests < testBcR.numBlockRequests) assert.True(t, oldNumBlockRequests < testBcR.numBlockRequests)
} else { } else {
assert.Equal(t, oldNumBlockRequests, testBcR.numBlockRequests) assert.Equal(t, oldNumBlockRequests, testBcR.numBlockRequests)
} }
for _, height := range step.blocksAdded { for _, height := range step.wantNewBlocks {
_, err := testBcR.fsm.pool.BlockAndPeerAtHeight(height) _, err := testBcR.fsm.pool.BlockAndPeerAtHeight(height)
assert.Nil(t, err) assert.Nil(t, err)
} }
@ -262,9 +262,9 @@ func executeFSMTests(t *testing.T, tests []testFields, matchRespToReq bool) {
assert.Nil(t, secondAfter) assert.Nil(t, secondAfter)
} }
assert.Equal(t, step.expectedState, testBcR.fsm.state.name) assert.Equal(t, step.wantState, testBcR.fsm.state.name)
if step.expectedState == "finished" { if step.wantState == "finished" {
assert.True(t, testBcR.fsm.isCaughtUp()) assert.True(t, testBcR.fsm.isCaughtUp())
} }
} }
@ -279,20 +279,12 @@ func TestFSMBasic(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 2, maxRequestsPerPeer: 2,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), sStatusEv("waitForPeer", "waitForBlock", "P1", 2, nil),
// statusResponseEv sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 2, nil), sBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}),
// makeRequestEv sBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{1}),
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), sProcessedBlockEv("waitForBlock", "finished", nil),
// blockResponseEv for height 1
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 1, []int64{}),
// blockResponseEv for height 2
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P2", 2, []int64{1}),
// processedBlockEv
makeStepProcessedBlockEv("waitForBlock", "finished", nil),
}, },
}, },
{ {
@ -300,27 +292,19 @@ func TestFSMBasic(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 2, maxRequestsPerPeer: 2,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), sStatusEv("waitForPeer", "waitForBlock", "P1", 4, nil),
// statusResponseEv from P1 sStatusEv("waitForBlock", "waitForBlock", "P2", 4, nil),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 4, nil), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
// statusResponseEv from P2
makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 4, nil),
// makeRequestEv
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
// blockResponseEv for height 1 sBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}),
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}), sBlockRespEv("waitForBlock", "waitForBlock", "P1", 2, []int64{1}),
// blockResponseEv for height 2 sBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{1, 2}),
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P1", 2, []int64{1}), sBlockRespEv("waitForBlock", "waitForBlock", "P2", 4, []int64{1, 2, 3}),
// blockResponseEv for height 3
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{1, 2}), sProcessedBlockEv("waitForBlock", "waitForBlock", nil),
// blockResponseEv for height 4 sProcessedBlockEv("waitForBlock", "waitForBlock", nil),
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 4, []int64{1, 2, 3}), sProcessedBlockEv("waitForBlock", "finished", nil),
// processedBlockEv
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil),
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil),
makeStepProcessedBlockEv("waitForBlock", "finished", nil),
}, },
}, },
} }
@ -335,40 +319,30 @@ func TestFSMBlockVerificationFailure(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(),
// statusResponseEv from P1
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
// makeRequestEv // add P1 and get blocks 1-3 from it
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
// blockResponseEv for height 1 sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
makeStepBlockRespEv("waitForBlock", "waitForBlock", sBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}),
"P1", 1, []int64{}), sBlockRespEv("waitForBlock", "waitForBlock", "P1", 2, []int64{1}),
// blockResponseEv for height 2 sBlockRespEv("waitForBlock", "waitForBlock", "P1", 3, []int64{1, 2}),
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 2, []int64{1}),
// blockResponseEv for height 3
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 3, []int64{1, 2}),
// statusResponseEv from P2 // add P2
makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil), sStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil),
// processedBlockEv with Error // process block failure, should remove P1 and all blocks
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", errBlockVerificationFailure), sProcessedBlockEv("waitForBlock", "waitForBlock", errBlockVerificationFailure),
// makeRequestEv // get blocks 1-3 from P2
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
makeStepBlockRespEv("waitForBlock", "waitForBlock", sBlockRespEv("waitForBlock", "waitForBlock", "P2", 1, []int64{}),
"P2", 1, []int64{}), sBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{1}),
// blockResponseEv for height 2 sBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{1, 2}),
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{1}),
// blockResponseEv for height 3
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{1, 2}),
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), // finish after processing blocks 1 and 2
makeStepProcessedBlockEv("waitForBlock", "finished", nil), sProcessedBlockEv("waitForBlock", "waitForBlock", nil),
sProcessedBlockEv("waitForBlock", "finished", nil),
}, },
}, },
} }
@ -383,14 +357,13 @@ func TestFSMBadBlockFromPeer(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), // add P1 and ask for blocks 1-3
// statusResponseEv from P1 sStatusEv("waitForPeer", "waitForBlock", "P1", 300, nil),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 300, nil), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
// makeRequestEv
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), // blockResponseEv for height 100 should cause an error
// blockResponseEv for height 100 sBlockRespEvErrored("waitForBlock", "waitForPeer",
makeStepBlockRespEvErrored("waitForBlock", "waitForBlock",
"P1", 100, []int64{}, errMissingRequest, []p2p.ID{}), "P1", 100, []int64{}, errMissingRequest, []p2p.ID{}),
}, },
}, },
@ -399,19 +372,16 @@ func TestFSMBadBlockFromPeer(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
makeStepStartFSMEv(), sStartFSMEv(),
// add P1 and get block 1
// statusResponseEv from P1 sStatusEv("waitForPeer", "waitForBlock", "P1", 100, nil),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 100, nil), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
sBlockRespEv("waitForBlock", "waitForBlock",
// request and add block 1
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 1, []int64{}), "P1", 1, []int64{}),
// simulate error in this step. Since peer is removed together with block 1, // Get block 1 again. Since peer is removed together with block 1,
// the blocks present in the pool will be {} // the blocks present in the pool should be {}
makeStepBlockRespEvErrored("waitForBlock", "waitForBlock", sBlockRespEvErrored("waitForBlock", "waitForPeer",
"P1", 1, []int64{}, errDuplicateBlock, []p2p.ID{"P1"}), "P1", 1, []int64{}, errDuplicateBlock, []p2p.ID{"P1"}),
}, },
}, },
@ -420,14 +390,13 @@ func TestFSMBadBlockFromPeer(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), // add P1 and get block 1
// statusResponseEv from P1 sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
// makeRequestEv // get block 1 from unknown peer P2
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
makeStepBlockRespEvErrored("waitForBlock", "waitForBlock", sBlockRespEvErrored("waitForBlock", "waitForBlock",
"P2", 1, []int64{}, errBadDataFromPeer, []p2p.ID{"P2"}), "P2", 1, []int64{}, errBadDataFromPeer, []p2p.ID{"P2"}),
}, },
}, },
@ -436,17 +405,16 @@ func TestFSMBadBlockFromPeer(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), // add P1, make requests for blocks 1-3 to P1
// statusResponseEv from P1 sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
// makeRequestEv
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
// add another peer // add P2
makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil), sStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil),
makeStepBlockRespEvErrored("waitForBlock", "waitForBlock", // receive block 1 from P2
sBlockRespEvErrored("waitForBlock", "waitForBlock",
"P2", 1, []int64{}, errBadDataFromPeer, []p2p.ID{"P2"}), "P2", 1, []int64{}, errBadDataFromPeer, []p2p.ID{"P2"}),
}, },
}, },
@ -462,32 +430,27 @@ func TestFSMBlockAtCurrentHeightDoesNotArriveInTime(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), // add P1, get blocks 1 and 2, process block 1
// statusResponseEv from P1 sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
// make requests (blocks 1-3 to P1) sBlockRespEv("waitForBlock", "waitForBlock",
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
// blocks received for heights 1 and 2
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 1, []int64{}), "P1", 1, []int64{}),
makeStepBlockRespEv("waitForBlock", "waitForBlock", sBlockRespEv("waitForBlock", "waitForBlock",
"P1", 2, []int64{1}), "P1", 2, []int64{1}),
// processed block at height 1 sProcessedBlockEv("waitForBlock", "waitForBlock", nil),
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil),
// add peer P2 // add P2
makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil), sStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil),
// timeout on block at height 3, P1 will be removed
makeStepStateTimeoutEv("waitForBlock", "waitForBlock", "waitForBlock", errNoPeerResponseForCurrentHeights),
// make some requests (should include redo-s for blocks 2 and 3) // timeout on block 3, P1 should be removed
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), sStateTimeoutEv("waitForBlock", "waitForBlock", "waitForBlock", errNoPeerResponseForCurrentHeights),
// block received for height 2 from P2
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{}), // make requests and finish by receiving blocks 2 and 3 from P2
// block received for height 3 from P2 sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{2}), sBlockRespEv("waitForBlock", "waitForBlock", "P2", 2, []int64{}),
makeStepProcessedBlockEv("waitForBlock", "finished", nil), sBlockRespEv("waitForBlock", "waitForBlock", "P2", 3, []int64{2}),
sProcessedBlockEv("waitForBlock", "finished", nil),
}, },
}, },
{ {
@ -495,28 +458,26 @@ func TestFSMBlockAtCurrentHeightDoesNotArriveInTime(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), // add P1, request blocks 1-3 from P1
// statusResponseEv from P1 sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
// make some requests (blocks 1-3 to P1)
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
// add peer P2 // add P2 (tallest)
makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 30, nil), sStatusEv("waitForBlock", "waitForBlock", "P2", 30, nil),
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
// blocks received for heights 1-3 // receive blocks 1-3 from P1
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}), sBlockRespEv("waitForBlock", "waitForBlock", "P1", 1, []int64{}),
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P1", 2, []int64{1}), sBlockRespEv("waitForBlock", "waitForBlock", "P1", 2, []int64{1}),
makeStepBlockRespEv("waitForBlock", "waitForBlock", "P1", 3, []int64{1, 2}), sBlockRespEv("waitForBlock", "waitForBlock", "P1", 3, []int64{1, 2}),
// process blocks at heights 1 and 2 // process blocks at heights 1 and 2
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), sProcessedBlockEv("waitForBlock", "waitForBlock", nil),
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), sProcessedBlockEv("waitForBlock", "waitForBlock", nil),
// timeout on block at height 4 // timeout on block at height 4
makeStepStateTimeoutEv("waitForBlock", "finished", "waitForBlock", nil), sStateTimeoutEv("waitForBlock", "finished", "waitForBlock", nil),
}, },
}, },
} }
@ -530,28 +491,26 @@ func TestFSMPeerRelatedEvents(t *testing.T) {
name: "peer remove event with no blocks", name: "peer remove event with no blocks",
startingHeight: 1, startingHeight: 1,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), // add P1, P2, P3
// statusResponseEv from P1 sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), sStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil),
// statusResponseEv from P2 sStatusEv("waitForBlock", "waitForBlock", "P3", 3, nil),
makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil),
// statusResponseEv from P3 // switch removes P2
makeStepStatusEv("waitForBlock", "waitForBlock", "P3", 3, nil), sPeerRemoveEv("waitForBlock", "waitForBlock", "P2", errSwitchRemovesPeer, []p2p.ID{"P2"}),
makeStepPeerRemoveEv("waitForBlock", "waitForBlock", "P2", errSwitchRemovesPeer, []p2p.ID{"P2"}),
}, },
}, },
{ {
name: "only peer removed while in waitForBlock state", name: "only peer removed while in waitForBlock state",
startingHeight: 100, startingHeight: 100,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), // add P1
// statusResponseEv from P1 sStatusEv("waitForPeer", "waitForBlock", "P1", 200, nil),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 200, nil),
// switch removes peer P1 // switch removes P1
makeStepPeerRemoveEv("waitForBlock", "waitForPeer", "P1", errSwitchRemovesPeer, []p2p.ID{"P1"}), sPeerRemoveEv("waitForBlock", "waitForPeer", "P1", errSwitchRemovesPeer, []p2p.ID{"P1"}),
}, },
}, },
{ {
@ -559,88 +518,70 @@ func TestFSMPeerRelatedEvents(t *testing.T) {
startingHeight: 100, startingHeight: 100,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), // add P1 and make requests
// statusResponseEv from P1 sStatusEv("waitForPeer", "waitForBlock", "P1", 101, nil),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 101, nil), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), // add P2
sStatusEv("waitForBlock", "waitForBlock", "P2", 200, nil),
// statusResponseEv from P2 // get blocks 100 and 101 from P1 and process block at height 100
makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 200, nil), sBlockRespEv("waitForBlock", "waitForBlock", "P1", 100, []int64{}),
sBlockRespEv("waitForBlock", "waitForBlock", "P1", 101, []int64{100}),
sProcessedBlockEv("waitForBlock", "waitForBlock", nil),
// get blocks from P1 // switch removes peer P1, should be finished
makeStepBlockRespEv("waitForBlock", "waitForBlock", sPeerRemoveEv("waitForBlock", "finished", "P2", errSwitchRemovesPeer, []p2p.ID{"P2"}),
"P1", 100, []int64{}),
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 101, []int64{100}),
// processed block at heights 1 and 2
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil),
// switch removes peer P1
makeStepPeerRemoveEv("waitForBlock", "finished", "P2", errSwitchRemovesPeer, []p2p.ID{"P2"}),
}, },
}, },
{ {
name: "highest peer becomes short while in waitForBlock state, node reaches maxPeerHeight - TS4", name: "highest peer lowers its height in waitForBlock state, node reaches maxPeerHeight - TS4",
startingHeight: 100, startingHeight: 100,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), // add P1 and make requests
// statusResponseEv from P1 sStatusEv("waitForPeer", "waitForBlock", "P1", 101, nil),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 101, nil), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
// statusResponseEv from P2 // add P2
makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 200, nil), sStatusEv("waitForBlock", "waitForBlock", "P2", 200, nil),
// get blocks from P1 // get blocks 100 and 101 from P1
makeStepBlockRespEv("waitForBlock", "waitForBlock", sBlockRespEv("waitForBlock", "waitForBlock", "P1", 100, []int64{}),
"P1", 100, []int64{}), sBlockRespEv("waitForBlock", "waitForBlock", "P1", 101, []int64{100}),
makeStepBlockRespEv("waitForBlock", "waitForBlock",
"P1", 101, []int64{100}),
// processed block at heights 1 and 2 // processed block at heights 100
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil), sProcessedBlockEv("waitForBlock", "waitForBlock", nil),
// P1 becomes short // P2 becomes short
makeStepStatusEv("waitForBlock", "finished", "P2", 100, nil), sStatusEv("waitForBlock", "finished", "P2", 100, errPeerLowersItsHeight),
}, },
}, },
{ {
name: "new short peer while in waitForPeer state", name: "new short peer while in waitForPeer state",
startingHeight: 100, startingHeight: 100,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), sStatusEv("waitForPeer", "waitForPeer", "P1", 3, errPeerTooShort),
// statusResponseEv from P1
makeStepStatusEv("waitForPeer", "waitForPeer", "P1", 3, errPeerTooShort),
}, },
}, },
{ {
name: "new short peer while in waitForBlock state", name: "new short peer while in waitForBlock state",
startingHeight: 100, startingHeight: 100,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), sStatusEv("waitForPeer", "waitForBlock", "P1", 200, nil),
// statusResponseEv from P1 sStatusEv("waitForBlock", "waitForBlock", "P2", 3, errPeerTooShort),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 200, nil),
// statusResponseEv from P2
makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 3, errPeerTooShort),
}, },
}, },
{ {
name: "only peer updated with low height while in waitForBlock state", name: "only peer updated with low height while in waitForBlock state",
startingHeight: 100, startingHeight: 100,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), sStatusEv("waitForPeer", "waitForBlock", "P1", 200, nil),
// statusResponseEv from P1 sStatusEv("waitForBlock", "waitForPeer", "P1", 3, errPeerLowersItsHeight),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 200, nil),
// statusResponseEv from P1
makeStepStatusEv("waitForBlock", "waitForPeer", "P1", 3, errPeerLowersItsHeight),
}, },
}, },
{ {
@ -648,11 +589,13 @@ func TestFSMPeerRelatedEvents(t *testing.T) {
startingHeight: 9999999, startingHeight: 9999999,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), // add P1
// statusResponseEv from P1 sStatusEv("waitForPeer", "waitForBlock", "P1", 20000000, nil),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 20000000, nil), // send request for block 9999999
makeStepMakeRequestsEvErrored("waitForBlock", "waitForBlock", // Note: For this block request the "switch missing the peer" error is simulated,
// see implementation of bcReactor interface, sendBlockRequest(), in this file.
sMakeRequestsEvErrored("waitForBlock", "waitForBlock",
maxNumRequests, nil, []p2p.ID{"P1"}), maxNumRequests, nil, []p2p.ID{"P1"}),
}, },
}, },
@ -666,29 +609,24 @@ func TestFSMStopFSM(t *testing.T) {
{ {
name: "stopFSMEv in unknown", name: "stopFSMEv in unknown",
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStopFSMEv("unknown", "finished"),
makeStepStopFSMEv("unknown", "finished"),
}, },
}, },
{ {
name: "stopFSMEv in waitForPeer", name: "stopFSMEv in waitForPeer",
startingHeight: 1, startingHeight: 1,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), sStopFSMEv("waitForPeer", "finished"),
// stopFSMEv
makeStepStopFSMEv("waitForPeer", "finished"),
}, },
}, },
{ {
name: "stopFSMEv in waitForBlock", name: "stopFSMEv in waitForBlock",
startingHeight: 1, startingHeight: 1,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
// startFSMEv sStartFSMEv(),
makeStepStartFSMEv(), sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
// statusResponseEv from P1 sStopFSMEv("waitForBlock", "finished"),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepStopFSMEv("waitForBlock", "finished"),
}, },
}, },
} }
@ -701,23 +639,23 @@ func TestFSMUnknownElements(t *testing.T) {
{ {
name: "unknown event for state unknown", name: "unknown event for state unknown",
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
makeStepUnknownFSMEv("unknown"), sUnknownFSMEv("unknown"),
}, },
}, },
{ {
name: "unknown event for state waitForPeer", name: "unknown event for state waitForPeer",
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
makeStepStartFSMEv(), sStartFSMEv(),
makeStepUnknownFSMEv("waitForPeer"), sUnknownFSMEv("waitForPeer"),
}, },
}, },
{ {
name: "unknown event for state waitForBlock", name: "unknown event for state waitForBlock",
startingHeight: 1, startingHeight: 1,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
makeStepStartFSMEv(), sStartFSMEv(),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepUnknownFSMEv("waitForBlock"), sUnknownFSMEv("waitForBlock"),
}, },
}, },
} }
@ -732,8 +670,8 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
makeStepStartFSMEv(), sStartFSMEv(),
makeStepStateTimeoutEv("waitForPeer", "finished", "waitForPeer", errNoTallerPeer), sStateTimeoutEv("waitForPeer", "finished", "waitForPeer", errNoTallerPeer),
}, },
}, },
{ {
@ -741,8 +679,8 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
makeStepStartFSMEv(), sStartFSMEv(),
makeStepStateTimeoutEv("waitForPeer", "waitForPeer", "waitForBlock", errTimeoutEventWrongState), sStateTimeoutEv("waitForPeer", "waitForPeer", "waitForBlock", errTimeoutEventWrongState),
}, },
}, },
{ {
@ -750,10 +688,10 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
makeStepStartFSMEv(), sStartFSMEv(),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
makeStepStateTimeoutEv("waitForBlock", "waitForPeer", "waitForBlock", errNoPeerResponseForCurrentHeights), sStateTimeoutEv("waitForBlock", "waitForPeer", "waitForBlock", errNoPeerResponseForCurrentHeights),
}, },
}, },
{ {
@ -761,10 +699,10 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
makeStepStartFSMEv(), sStartFSMEv(),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
makeStepStateTimeoutEv("waitForBlock", "waitForBlock", "waitForPeer", errTimeoutEventWrongState), sStateTimeoutEv("waitForBlock", "waitForBlock", "waitForPeer", errTimeoutEventWrongState),
}, },
}, },
{ {
@ -772,11 +710,11 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) {
startingHeight: 1, startingHeight: 1,
maxRequestsPerPeer: 3, maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{ steps: []fsmStepTestValues{
makeStepStartFSMEv(), sStartFSMEv(),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil), sStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil), sStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil),
makeStepStateTimeoutEv("waitForBlock", "waitForBlock", "waitForBlock", errNoPeerResponseForCurrentHeights), sStateTimeoutEv("waitForBlock", "waitForBlock", "waitForBlock", errNoPeerResponseForCurrentHeights),
}, },
}, },
} }
@ -808,7 +746,7 @@ func makeCorrectTransitionSequence(startingHeight int64, numBlocks int64, numPee
numBlocks, startingHeight, numPeers, maxRequestsPerPeer, maxPendingRequests) numBlocks, startingHeight, numPeers, maxRequestsPerPeer, maxPendingRequests)
// Add startFSMEv step. // Add startFSMEv step.
testSteps = append(testSteps, makeStepStartFSMEv()) testSteps = append(testSteps, sStartFSMEv())
// For each peer, add statusResponseEv step. // For each peer, add statusResponseEv step.
for i := 0; i < numPeers; i++ { for i := 0; i < numPeers; i++ {
@ -816,10 +754,10 @@ func makeCorrectTransitionSequence(startingHeight int64, numBlocks int64, numPee
if i == 0 { if i == 0 {
testSteps = append( testSteps = append(
testSteps, testSteps,
makeStepStatusEv("waitForPeer", "waitForBlock", p2p.ID(peerName), peerHeights[i], nil)) sStatusEv("waitForPeer", "waitForBlock", p2p.ID(peerName), peerHeights[i], nil))
} else { } else {
testSteps = append(testSteps, testSteps = append(testSteps,
makeStepStatusEv("waitForBlock", "waitForBlock", p2p.ID(peerName), peerHeights[i], nil)) sStatusEv("waitForBlock", "waitForBlock", p2p.ID(peerName), peerHeights[i], nil))
} }
} }
@ -834,14 +772,14 @@ forLoop:
if i%int(maxRequestsPerPeer) == 0 { if i%int(maxRequestsPerPeer) == 0 {
testSteps = append( testSteps = append(
testSteps, testSteps,
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests), sMakeRequestsEv("waitForBlock", "waitForBlock", maxNumRequests),
) )
} }
// Add the blockRespEv step // Add the blockRespEv step
testSteps = append( testSteps = append(
testSteps, testSteps,
makeStepBlockRespEv("waitForBlock", "waitForBlock", sBlockRespEv("waitForBlock", "waitForBlock",
"P0", height, prevBlocks)) "P0", height, prevBlocks))
prevBlocks = append(prevBlocks, height) prevBlocks = append(prevBlocks, height)
height++ height++
@ -854,12 +792,12 @@ forLoop:
// This is the last block that is processed, we should be in "finished" state. // This is the last block that is processed, we should be in "finished" state.
testSteps = append( testSteps = append(
testSteps, testSteps,
makeStepProcessedBlockEv("waitForBlock", "finished", nil)) sProcessedBlockEv("waitForBlock", "finished", nil))
break forLoop break forLoop
} }
testSteps = append( testSteps = append(
testSteps, testSteps,
makeStepProcessedBlockEv("waitForBlock", "waitForBlock", nil)) sProcessedBlockEv("waitForBlock", "waitForBlock", nil))
} }
numBlocksReceived = 0 numBlocksReceived = 0
prevBlocks = make([]int64, 0, maxPendingRequests) prevBlocks = make([]int64, 0, maxPendingRequests)
@ -942,16 +880,16 @@ func TestFSMCorrectTransitionSequences(t *testing.T) {
} }
fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data) fsmErr := sendEventToFSM(testBcR.fsm, step.event, step.data)
assert.Equal(t, step.errWanted, fsmErr) assert.Equal(t, step.wantErr, fsmErr)
if step.shouldSendStatusReq { if step.wantStatusReqSent {
assert.Equal(t, oldNumStatusRequests+1, testBcR.numStatusRequests) assert.Equal(t, oldNumStatusRequests+1, testBcR.numStatusRequests)
} else { } else {
assert.Equal(t, oldNumStatusRequests, testBcR.numStatusRequests) assert.Equal(t, oldNumStatusRequests, testBcR.numStatusRequests)
} }
assert.Equal(t, step.expectedState, testBcR.fsm.state.name) assert.Equal(t, step.wantState, testBcR.fsm.state.name)
if step.expectedState == "finished" { if step.wantState == "finished" {
assert.True(t, testBcR.fsm.isCaughtUp()) assert.True(t, testBcR.fsm.isCaughtUp())
} }
} }

View File

@ -1,4 +1,4 @@
package blockchainexp package v1
import ( import (
"fmt" "fmt"
@ -157,7 +157,7 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
defer os.RemoveAll(config.RootDir) defer os.RemoveAll(config.RootDir)
genDoc, privVals := randGenesisDoc(1, false, 30) genDoc, privVals := randGenesisDoc(1, false, 30)
maxBlockHeight := int64(500) maxBlockHeight := int64(65)
reactorPairs := make([]BlockchainReactorPair, 2) reactorPairs := make([]BlockchainReactorPair, 2)
@ -193,7 +193,7 @@ func TestFastSyncNoBlockResponse(t *testing.T) {
} }
for { for {
time.Sleep(1 * time.Second) time.Sleep(10 * time.Millisecond)
reactorPairs[1].conR.mtx.Lock() reactorPairs[1].conR.mtx.Lock()
if reactorPairs[1].conR.switchedToConsensus { if reactorPairs[1].conR.switchedToConsensus {
reactorPairs[1].conR.mtx.Unlock() reactorPairs[1].conR.mtx.Unlock()
@ -265,7 +265,7 @@ func TestFastSyncBadBlockStopsPeer(t *testing.T) {
outerFor: outerFor:
for { for {
time.Sleep(1 * time.Second) time.Sleep(10 * time.Millisecond)
for i := 0; i < numNodes; i++ { for i := 0; i < numNodes; i++ {
reactorPairs[i].conR.mtx.Lock() reactorPairs[i].conR.mtx.Lock()
if !reactorPairs[i].conR.switchedToConsensus { if !reactorPairs[i].conR.switchedToConsensus {
@ -274,7 +274,6 @@ outerFor:
} }
reactorPairs[i].conR.mtx.Unlock() reactorPairs[i].conR.mtx.Unlock()
} }
break break
} }
@ -318,112 +317,6 @@ outerFor:
assert.True(t, lastReactorPair.bcR.Switch.Peers().Size() < len(reactorPairs)-1) assert.True(t, lastReactorPair.bcR.Switch.Peers().Size() < len(reactorPairs)-1)
} }
func setupReactors(
numReactors int, maxBlockHeight int64,
genDoc *types.GenesisDoc, privVals []types.PrivValidator) ([]BlockchainReactorPair, []*p2p.Switch) {
defer os.RemoveAll(config.RootDir)
reactorPairs := make([]BlockchainReactorPair, numReactors)
var logger = make([]log.Logger, numReactors)
for i := 0; i < numReactors; i++ {
logger[i] = log.TestingLogger()
height := int64(0)
if i == 0 {
height = maxBlockHeight
}
reactorPairs[i] = newBlockchainReactorPair(logger[i], genDoc, privVals, height)
}
switches := p2p.MakeConnectedSwitches(config.P2P, numReactors, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[i].bcR)
s.AddReactor("CONSENSUS", reactorPairs[i].conR)
moduleName := fmt.Sprintf("blockchain-%v", i)
reactorPairs[i].bcR.SetLogger(logger[i].With("module", moduleName))
return s
}, p2p.Connect2Switches)
return reactorPairs, switches
}
// WIP - used for some scale testing, will remove
func TestFastSyncMultiNode(t *testing.T) {
numNodes := 6
maxHeight := int64(500)
//numNodes := 20
//maxHeight := int64(10000)
config = cfg.ResetTestRoot("blockchain_reactor_test")
genDoc, privVals := randGenesisDoc(1, false, 30)
reactorPairs, switches := setupReactors(numNodes, maxHeight, genDoc, privVals)
defer func() {
for _, r := range reactorPairs {
_ = r.bcR.Stop()
_ = r.conR.Stop()
}
}()
outerFor:
for {
time.Sleep(1 * time.Second)
for i := 0; i < numNodes; i++ {
reactorPairs[i].conR.mtx.Lock()
if !reactorPairs[i].conR.switchedToConsensus {
reactorPairs[i].conR.mtx.Unlock()
continue outerFor
}
reactorPairs[i].conR.mtx.Unlock()
}
break
}
//at this time, reactors[0-3] are the newest
assert.Equal(t, numNodes-1, reactorPairs[0].bcR.Switch.Peers().Size())
lastLogger := log.TestingLogger()
lastReactorPair := newBlockchainReactorPair(lastLogger, genDoc, privVals, 0)
reactorPairs = append(reactorPairs, lastReactorPair)
switches = append(switches, p2p.MakeConnectedSwitches(config.P2P, 1, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("BLOCKCHAIN", reactorPairs[len(reactorPairs)-1].bcR)
s.AddReactor("CONSENSUS", reactorPairs[len(reactorPairs)-1].conR)
moduleName := fmt.Sprintf("blockchainTEST-%d", len(reactorPairs)-1)
reactorPairs[len(reactorPairs)-1].bcR.SetLogger(lastLogger.With("module", moduleName))
return s
}, p2p.Connect2Switches)...)
start := time.Now()
for i := 0; i < len(reactorPairs)-1; i++ {
p2p.Connect2Switches(switches, i, len(reactorPairs)-1)
}
for {
time.Sleep(1 * time.Second)
lastReactorPair.conR.mtx.Lock()
if lastReactorPair.conR.switchedToConsensus {
fmt.Println("FAST SYNC Duration", time.Since(start))
lastReactorPair.conR.mtx.Unlock()
break
}
lastReactorPair.conR.mtx.Unlock()
if lastReactorPair.bcR.Switch.Peers().Size() == 0 {
break
}
}
assert.True(t, lastReactorPair.bcR.Switch.Peers().Size() < len(reactorPairs))
}
//---------------------------------------------- //----------------------------------------------
// utility funcs // utility funcs

View File

@ -1,4 +1,4 @@
package blockchainexp package v1
import ( import (
amino "github.com/tendermint/go-amino" amino "github.com/tendermint/go-amino"

View File

@ -19,7 +19,7 @@ func AddNodeFlags(cmd *cobra.Command) {
cmd.Flags().String("priv_validator_laddr", config.PrivValidatorListenAddr, "Socket address to listen on for connections from external priv_validator process") cmd.Flags().String("priv_validator_laddr", config.PrivValidatorListenAddr, "Socket address to listen on for connections from external priv_validator process")
// node flags // node flags
cmd.Flags().Bool("fast_sync", config.FastSync, "Fast blockchain syncing") cmd.Flags().Bool("fast_sync", config.FastSyncMode, "Fast blockchain syncing")
// abci flags // abci flags
cmd.Flags().String("proxy_app", config.ProxyApp, "Proxy app address, or one of: 'kvstore', 'persistent_kvstore', 'counter', 'counter_serial' or 'noop' for local testing.") cmd.Flags().String("proxy_app", config.ProxyApp, "Proxy app address, or one of: 'kvstore', 'persistent_kvstore', 'counter', 'counter_serial' or 'noop' for local testing.")

View File

@ -64,7 +64,7 @@ type Config struct {
RPC *RPCConfig `mapstructure:"rpc"` RPC *RPCConfig `mapstructure:"rpc"`
P2P *P2PConfig `mapstructure:"p2p"` P2P *P2PConfig `mapstructure:"p2p"`
Mempool *MempoolConfig `mapstructure:"mempool"` Mempool *MempoolConfig `mapstructure:"mempool"`
FastSyncParams *FastSyncParamsConfig `mapstructure:"fastsync"` FastSync *FastSyncConfig `mapstructure:"fastsync"`
Consensus *ConsensusConfig `mapstructure:"consensus"` Consensus *ConsensusConfig `mapstructure:"consensus"`
TxIndex *TxIndexConfig `mapstructure:"tx_index"` TxIndex *TxIndexConfig `mapstructure:"tx_index"`
Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"` Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"`
@ -77,7 +77,7 @@ func DefaultConfig() *Config {
RPC: DefaultRPCConfig(), RPC: DefaultRPCConfig(),
P2P: DefaultP2PConfig(), P2P: DefaultP2PConfig(),
Mempool: DefaultMempoolConfig(), Mempool: DefaultMempoolConfig(),
FastSyncParams: DefaultFastSyncParamsConfig(), FastSync: DefaultFastSyncConfig(),
Consensus: DefaultConsensusConfig(), Consensus: DefaultConsensusConfig(),
TxIndex: DefaultTxIndexConfig(), TxIndex: DefaultTxIndexConfig(),
Instrumentation: DefaultInstrumentationConfig(), Instrumentation: DefaultInstrumentationConfig(),
@ -91,7 +91,7 @@ func TestConfig() *Config {
RPC: TestRPCConfig(), RPC: TestRPCConfig(),
P2P: TestP2PConfig(), P2P: TestP2PConfig(),
Mempool: TestMempoolConfig(), Mempool: TestMempoolConfig(),
FastSyncParams: TestFastSyncParamsConfig(), FastSync: TestFastSyncConfig(),
Consensus: TestConsensusConfig(), Consensus: TestConsensusConfig(),
TxIndex: TestTxIndexConfig(), TxIndex: TestTxIndexConfig(),
Instrumentation: TestInstrumentationConfig(), Instrumentation: TestInstrumentationConfig(),
@ -123,7 +123,7 @@ func (cfg *Config) ValidateBasic() error {
if err := cfg.Mempool.ValidateBasic(); err != nil { if err := cfg.Mempool.ValidateBasic(); err != nil {
return errors.Wrap(err, "Error in [mempool] section") return errors.Wrap(err, "Error in [mempool] section")
} }
if err := cfg.FastSyncParams.ValidateBasic(); err != nil { if err := cfg.FastSync.ValidateBasic(); err != nil {
return errors.Wrap(err, "Error in [fastsync] section") return errors.Wrap(err, "Error in [fastsync] section")
} }
if err := cfg.Consensus.ValidateBasic(); err != nil { if err := cfg.Consensus.ValidateBasic(); err != nil {
@ -157,7 +157,7 @@ type BaseConfig struct {
// If this node is many blocks behind the tip of the chain, FastSync // If this node is many blocks behind the tip of the chain, FastSync
// allows them to catchup quickly by downloading blocks in parallel // allows them to catchup quickly by downloading blocks in parallel
// and verifying their commits // and verifying their commits
FastSync bool `mapstructure:"fast_sync"` FastSyncMode bool `mapstructure:"fast_sync"`
// Database backend: goleveldb | cleveldb | boltdb // Database backend: goleveldb | cleveldb | boltdb
// * goleveldb (github.com/syndtr/goleveldb - most popular implementation) // * goleveldb (github.com/syndtr/goleveldb - most popular implementation)
@ -222,7 +222,7 @@ func DefaultBaseConfig() BaseConfig {
LogLevel: DefaultPackageLogLevels(), LogLevel: DefaultPackageLogLevels(),
LogFormat: LogFormatPlain, LogFormat: LogFormatPlain,
ProfListenAddress: "", ProfListenAddress: "",
FastSync: true, FastSyncMode: true,
FilterPeers: false, FilterPeers: false,
DBBackend: "goleveldb", DBBackend: "goleveldb",
DBPath: "data", DBPath: "data",
@ -234,7 +234,7 @@ func TestBaseConfig() BaseConfig {
cfg := DefaultBaseConfig() cfg := DefaultBaseConfig()
cfg.chainID = "tendermint_test" cfg.chainID = "tendermint_test"
cfg.ProxyApp = "kvstore" cfg.ProxyApp = "kvstore"
cfg.FastSync = false cfg.FastSyncMode = false
cfg.DBBackend = "memdb" cfg.DBBackend = "memdb"
return cfg return cfg
} }
@ -661,26 +661,27 @@ func (cfg *MempoolConfig) ValidateBasic() error {
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// FastSyncParamsConfig // FastSyncConfig
// FastSyncParamsConfig defines the configuration for the Tendermint fast sync service // FastSyncConfig defines the configuration for the Tendermint fast sync service
type FastSyncParamsConfig struct { type FastSyncConfig struct {
Version string `mapstructure:"version"` Version string `mapstructure:"version"`
} }
// DefaultFastSyncParamsConfig returns a default configuration for the fast sync service // DefaultFastSyncConfig returns a default configuration for the fast sync service
func DefaultFastSyncParamsConfig() *FastSyncParamsConfig { func DefaultFastSyncConfig() *FastSyncConfig {
return &FastSyncParamsConfig{ return &FastSyncConfig{
Version: "v0", Version: "v0",
} }
} }
// TestTxIndexConfig returns a default configuration for the transaction indexer. // TestFastSyncConfig returns a default configuration for the fast sync.
func TestFastSyncParamsConfig() *FastSyncParamsConfig { func TestFastSyncConfig() *FastSyncConfig {
return DefaultFastSyncParamsConfig() return DefaultFastSyncConfig()
} }
func (cfg *FastSyncParamsConfig) ValidateBasic() error { // ValidateBasic performs basic validation.
func (cfg *FastSyncConfig) ValidateBasic() error {
switch cfg.Version { switch cfg.Version {
case "v0": case "v0":
return nil return nil

View File

@ -79,7 +79,7 @@ moniker = "{{ .BaseConfig.Moniker }}"
# If this node is many blocks behind the tip of the chain, FastSync # If this node is many blocks behind the tip of the chain, FastSync
# allows them to catchup quickly by downloading blocks in parallel # allows them to catchup quickly by downloading blocks in parallel
# and verifying their commits # and verifying their commits
fast_sync = {{ .BaseConfig.FastSync }} fast_sync = {{ .BaseConfig.FastSyncMode }}
# Database backend: goleveldb | cleveldb | boltdb # Database backend: goleveldb | cleveldb | boltdb
# * goleveldb (github.com/syndtr/goleveldb - most popular implementation) # * goleveldb (github.com/syndtr/goleveldb - most popular implementation)
@ -292,7 +292,7 @@ cache_size = {{ .Mempool.CacheSize }}
# Fast Sync version to use: # Fast Sync version to use:
# 1) "v0" (default) - the legacy fast sync implementation # 1) "v0" (default) - the legacy fast sync implementation
# 2) "v1" - refactor of v0 version for better testability # 2) "v1" - refactor of v0 version for better testability
version = "{{ .FastSyncParams.Version }}" version = "{{ .FastSync.Version }}"
##### consensus configuration options ##### ##### consensus configuration options #####
[consensus] [consensus]

View File

@ -329,13 +329,13 @@ func createBlockchainReactor(config *cfg.Config,
fastSync bool, fastSync bool,
logger log.Logger) (bcReactor p2p.Reactor, err error) { logger log.Logger) (bcReactor p2p.Reactor, err error) {
switch config.FastSyncParams.Version { switch config.FastSync.Version {
case "v0": case "v0":
bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor = bcv0.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
case "v1": case "v1":
bcReactor = bcv1.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor = bcv1.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
default: default:
return nil, fmt.Errorf("unknown fastsync version %s", config.FastSyncParams.Version) return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
} }
bcReactor.SetLogger(logger.With("module", "blockchain")) bcReactor.SetLogger(logger.With("module", "blockchain"))
@ -562,7 +562,7 @@ func NewNode(config *cfg.Config,
// Decide whether to fast-sync or not // Decide whether to fast-sync or not
// We don't fast-sync when the only validator is us. // We don't fast-sync when the only validator is us.
fastSync := config.FastSync && !onlyValidatorIsUs(state, privValidator) fastSync := config.FastSyncMode && !onlyValidatorIsUs(state, privValidator)
csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID) csMetrics, p2pMetrics, memplMetrics, smMetrics := metricsProvider(genDoc.ChainID)
@ -999,13 +999,13 @@ func makeNodeInfo(
} }
var bcChannel byte var bcChannel byte
switch config.FastSyncParams.Version { switch config.FastSync.Version {
case "v0": case "v0":
bcChannel = bcv0.BlockchainChannel bcChannel = bcv0.BlockchainChannel
case "v1": case "v1":
bcChannel = bcv1.BlockchainChannel bcChannel = bcv1.BlockchainChannel
default: default:
return nil, fmt.Errorf("unknown fastsync version %s", config.FastSyncParams.Version) return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
} }
nodeInfo := p2p.DefaultNodeInfo{ nodeInfo := p2p.DefaultNodeInfo{