This commit is contained in:
Anca Zamfir
2019-05-07 21:06:43 -04:00
parent 61a722a6c4
commit af90cb4f2d
6 changed files with 194 additions and 123 deletions

8
Gopkg.lock generated
View File

@ -170,9 +170,12 @@
revision = "b84e30acd515aadc4b783ad4ff83aff3299bdfe0"
[[projects]]
digest = "1:c568d7727aa262c32bdf8a3f7db83614f7af0ed661474b24588de635c20024c7"
digest = "1:53e8c5c79716437e601696140e8b1801aae4204f4ec54a504333702a49572c4f"
name = "github.com/magiconair/properties"
packages = ["."]
packages = [
".",
"assert",
]
pruneopts = "UT"
revision = "c2353362d570a7bfa228149c62842019201cfb71"
version = "v1.8.0"
@ -516,6 +519,7 @@
"github.com/golang/protobuf/ptypes/timestamp",
"github.com/gorilla/websocket",
"github.com/jmhodges/levigo",
"github.com/magiconair/properties/assert",
"github.com/pkg/errors",
"github.com/prometheus/client_golang/prometheus",
"github.com/prometheus/client_golang/prometheus/promhttp",

View File

@ -30,16 +30,16 @@ var (
)
type bpPeer struct {
logger log.Logger
id p2p.ID
recvMonitor *flow.Monitor
height int64 // the peer reported height
numPending int32 // number of requests pending assignment or block response
blocks map[int64]*types.Block // blocks received or waiting to be received from this peer
numPending int32 // number of requests still waiting for block responses
blocks map[int64]*types.Block // blocks received or expected to be received from this peer
timeout *time.Timer
didTimeout bool
recvMonitor *flow.Monitor
logger log.Logger
errFunc func(err error, peerID p2p.ID) // function to call on error
}

View File

@ -239,7 +239,14 @@ func (pool *blockPool) sendRequest(height int64) bool {
if peer.height < height {
continue
}
// Log with Info if the request is made for current processing height, Debug otherwise
if height == pool.height {
pool.logger.Info("assign request to peer", "peer", peer.id, "height", height)
} else {
pool.logger.Debug("assign request to peer", "peer", peer.id, "height", height)
}
if err := pool.toBcR.sendBlockRequest(peer.id, height); err == errNilPeerForBlockRequest {
pool.removePeer(peer.id, err)
}
@ -279,7 +286,7 @@ func (pool *blockPool) addBlock(peerID p2p.ID, block *types.Block, blockSize int
pool.blocks[block.Height] = peerID
pool.numPending--
pool.peers[peerID].decrPending(blockSize)
pool.logger.Debug("added new block", "height", block.Height, "from_peer", peerID, "total", len(pool.blocks))
pool.logger.Info("added new block", "height", block.Height, "from_peer", peerID, "total", len(pool.blocks))
return nil
}
@ -305,9 +312,6 @@ func (pool *blockPool) getNextTwoBlocks() (first, second *blockData, err error)
if err == nil {
err = err2
}
pool.logger.Debug("blocks at height and/ or height+1", "first", first, "second", second)
return
}
@ -336,10 +340,12 @@ func (pool *blockPool) processedCurrentHeightBlock() {
}
func (pool *blockPool) removePeerAtCurrentHeight(err error) {
first, err := pool.getBlockAndPeerAtHeight(pool.height)
if err == nil {
pool.removePeer(first.peer.id, err)
peerID := pool.blocks[pool.height]
peer := pool.peers[peerID]
if peer == nil {
return
}
pool.removePeer(peer.id, err)
}
func (pool *blockPool) cleanup() {

View File

@ -21,10 +21,6 @@ const (
trySyncIntervalMS = 10
trySendIntervalMS = 10
// stop syncing when last block's time is
// within this much of the system time.
// stopSyncingDurationMinutes = 10
// ask for best height every 10s
statusUpdateIntervalSeconds = 10
// check if we should switch to consensus reactor
@ -49,15 +45,6 @@ type consensusReactor interface {
SwitchToConsensus(sm.State, int)
}
type peerError struct {
err error
peerID p2p.ID
}
func (e peerError) Error() string {
return fmt.Sprintf("error with peer %v: %s", e.peerID, e.err.Error())
}
// BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
p2p.BaseReactor
@ -75,15 +62,15 @@ type BlockchainReactor struct {
blocksSynced int
// Receive goroutine forwards messages to this channel to be processed in the context of the poolRoutine.
messagesForFSMCh chan bReactorMessageData
messagesForFSMCh chan bcReactorMessage
// Switch goroutine may send RemovePeer to the blockchain reactor. This is an error message that is relayed
// to this channel to be processed in the context of the poolRoutine.
errorsForFSMCh chan bReactorMessageData
errorsForFSMCh chan bcReactorMessage
// This channel is used by the FSM and indirectly the block pool to report errors to the blockchain reactor and
// the switch.
errorsFromFSMCh chan bReactorMessageData
eventsFromFSMCh chan bcFsmMessage
}
type BlockRequest struct {
@ -91,12 +78,31 @@ type BlockRequest struct {
PeerID p2p.ID
}
// bReactorMessageData structure is used by the reactor when sending messages to the FSM.
type bReactorMessageData struct {
// bcReactorMessage is used by the reactor to send messages to the FSM.
type bcReactorMessage struct {
event bReactorEvent
data bReactorEventData
}
type bFsmEvent uint
const (
// message type events
peerErrorEv = iota + 1
syncFinishedEv
)
type bFsmEventData struct {
peerID p2p.ID
err error
}
// bcFsmMessage is used by the FSM to send messages to the reactor
type bcFsmMessage struct {
event bFsmEvent
data bFsmEventData
}
// NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore,
fastSync bool) *BlockchainReactor {
@ -107,9 +113,9 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl
}
const capacity = 1000
errorsFromFSMCh := make(chan bReactorMessageData, capacity)
messagesForFSMCh := make(chan bReactorMessageData, capacity)
errorsForFSMCh := make(chan bReactorMessageData, capacity)
eventsFromFSMCh := make(chan bcFsmMessage, capacity)
messagesForFSMCh := make(chan bcReactorMessage, capacity)
errorsForFSMCh := make(chan bcReactorMessage, capacity)
bcR := &BlockchainReactor{
initialState: state,
@ -118,7 +124,7 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl
fastSync: fastSync,
store: store,
messagesForFSMCh: messagesForFSMCh,
errorsFromFSMCh: errorsFromFSMCh,
eventsFromFSMCh: eventsFromFSMCh,
errorsForFSMCh: errorsForFSMCh,
}
fsm := NewFSM(store.Height()+1, bcR)
@ -162,17 +168,16 @@ func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
// AddPeer implements Reactor by sending our state to peer.
func (bcR *BlockchainReactor) AddPeer(peer p2p.Peer) {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusResponseMessage{bcR.store.Height()})
if !peer.Send(BlockchainChannel, msgBytes) {
if !peer.TrySend(BlockchainChannel, msgBytes) {
// doing nothing, will try later in `poolRoutine`
}
// peer is added to the pool once we receive the first
// bcStatusResponseMessage from the peer and call pool.SetPeerHeight
// bcStatusResponseMessage from the peer and call pool.updatePeer()
}
// respondToPeer loads a block and sends it to the requesting peer,
// if we have it. Otherwise, we'll respond saying we don't have it.
// According to the Tendermint spec, if all nodes are honest,
// no node should be requesting for a block that's non-existent.
// sendBlockToPeer loads a block and sends it to the requesting peer.
// If the block doesn't exist a bcNoBlockResponseMessage is sent.
// If all nodes are honest, no node should be requesting for a block that doesn't exist.
func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcBlockRequestMessage,
src p2p.Peer) (queued bool) {
@ -193,14 +198,9 @@ func (bcR *BlockchainReactor) sendStatusResponseToPeer(msg *bcStatusRequestMessa
return src.TrySend(BlockchainChannel, msgBytes)
}
func (bcR *BlockchainReactor) sendMessageToFSMAsync(msg bReactorMessageData) {
bcR.Logger.Debug("send message to FSM for processing", "msg", msg.String())
bcR.messagesForFSMCh <- msg
}
// RemovePeer implements Reactor by removing peer from the pool.
func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
msgData := bReactorMessageData{
msgData := bcReactorMessage{
event: peerRemoveEv,
data: bReactorEventData{
peerId: peer.ID(),
@ -242,7 +242,7 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
}
case *bcBlockResponseMessage:
msgData := bReactorMessageData{
msgData := bcReactorMessage{
event: blockResponseEv,
data: bReactorEventData{
peerId: src.ID(),
@ -251,11 +251,11 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
length: len(msgBytes),
},
}
bcR.sendMessageToFSMAsync(msgData)
bcR.sendMessageToFSM(msgData)
case *bcStatusResponseMessage:
// Got a peer status. Unverified.
msgData := bReactorMessageData{
msgData := bcReactorMessage{
event: statusResponseEv,
data: bReactorEventData{
peerId: src.ID(),
@ -263,15 +263,19 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
length: len(msgBytes),
},
}
bcR.sendMessageToFSMAsync(msgData)
bcR.sendMessageToFSM(msgData)
default:
bcR.Logger.Error(fmt.Sprintf("unknown message type %v", reflect.TypeOf(msg)))
}
}
// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *BlockchainReactor) sendMessageToFSM(msg bcReactorMessage) {
bcR.Logger.Debug("send message to FSM for processing", "msg", msg.String())
bcR.messagesForFSMCh <- msg
}
// poolRoutine receives and handles messages from the Receive() routine and from the FSM
func (bcR *BlockchainReactor) poolRoutine() {
bcR.fsm.start()
@ -297,7 +301,7 @@ ForLoop:
// - the number of blocks received and waiting to be processed,
// - the number of blockResponse messages waiting in messagesForFSMCh, etc.
// Currently maxNumPendingRequests value is not changed.
_ = bcR.fsm.handle(&bReactorMessageData{
_ = bcR.fsm.handle(&bcReactorMessage{
event: makeRequestsEv,
data: bReactorEventData{
maxNumRequests: maxNumPendingRequests}})
@ -325,12 +329,12 @@ ForLoop:
}
case <-doProcessBlockCh:
err := bcR.processBlocksFromPoolRoutine()
err := bcR.processBlock()
if err == errMissingBlocks {
continue ForLoop
continue
}
// Notify FSM of block processing result.
_ = bcR.fsm.handle(&bReactorMessageData{
_ = bcR.fsm.handle(&bcReactorMessage{
event: processedBlockEv,
data: bReactorEventData{
err: err,
@ -338,7 +342,7 @@ ForLoop:
})
if err == errBlockVerificationFailure {
continue ForLoop
continue
}
doProcessBlockCh <- struct{}{}
@ -357,10 +361,21 @@ ForLoop:
case msg := <-bcR.errorsForFSMCh:
_ = bcR.fsm.handle(&msg)
case msg := <-bcR.errorsFromFSMCh:
bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerId)
case msg := <-bcR.eventsFromFSMCh:
switch msg.event {
case peerErrorEv:
bcR.reportPeerErrorToSwitch(msg.data.err, msg.data.peerID)
if msg.data.err == errNoPeerResponse {
_ = bcR.fsm.handle(&msg)
_ = bcR.fsm.handle(&bcReactorMessage{
event: peerRemoveEv,
data: bReactorEventData{
peerId: msg.data.peerID,
err: msg.data.err,
},
})
}
default:
bcR.Logger.Error("Event from FSM not supported", "type", msg.event)
}
case <-bcR.Quit():
@ -376,23 +391,8 @@ func (bcR *BlockchainReactor) reportPeerErrorToSwitch(err error, peerID p2p.ID)
}
}
// Called by FSM and pool:
// - pool calls when it detects slow peer or when peer times out
// - FSM calls when:
// - adding a block (addBlock) fails
// - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks
func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
msgData := bReactorMessageData{
event: peerRemoveEv,
data: bReactorEventData{
peerId: peerID,
err: err,
},
}
bcR.errorsFromFSMCh <- msgData
}
func (bcR *BlockchainReactor) processBlock() error {
func (bcR *BlockchainReactor) processBlocksFromPoolRoutine() error {
firstBP, secondBP, err := bcR.fsm.pool.getNextTwoBlocks()
if err != nil {
// We need both to sync the first block.
@ -429,31 +429,14 @@ func (bcR *BlockchainReactor) processBlocksFromPoolRoutine() error {
return nil
}
func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) {
if timer == nil {
panic("nil timer pointer parameter")
}
if *timer == nil {
*timer = time.AfterFunc(timeout, func() {
msg := bReactorMessageData{
event: stateTimeoutEv,
data: bReactorEventData{
stateName: name,
},
}
bcR.sendMessageToFSMAsync(msg)
})
} else {
(*timer).Reset(timeout)
}
}
// BroadcastStatusRequest broadcasts `BlockStore` height.
// Implements bcRMessageInterface
// sendStatusRequest broadcasts `BlockStore` height.
func (bcR *BlockchainReactor) sendStatusRequest() {
msgBytes := cdc.MustMarshalBinaryBare(&bcStatusRequestMessage{bcR.store.Height()})
bcR.Switch.Broadcast(BlockchainChannel, msgBytes)
}
// Implements bcRMessageInterface
// BlockRequest sends `BlockRequest` height.
func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) error {
peer := bcR.Switch.Peers().Get(peerID)
@ -470,15 +453,52 @@ func (bcR *BlockchainReactor) sendBlockRequest(peerID p2p.ID, height int64) erro
}
func (bcR *BlockchainReactor) switchToConsensus() {
conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conR.SwitchToConsensus(bcR.state, bcR.blocksSynced)
bcR.eventsFromFSMCh <- bcFsmMessage{event: syncFinishedEv}
} else {
// Should only happen during testing.
}
}
// Implements bcRMessageInterface
// Called by FSM and pool:
// - pool calls when it detects slow peer or when peer times out
// - FSM calls when:
// - adding a block (addBlock) fails
// - reactor processing of a block reports failure and FSM sends back the peers of first and second blocks
func (bcR *BlockchainReactor) sendPeerError(err error, peerID p2p.ID) {
msgData := bcFsmMessage{
event: peerErrorEv,
data: bFsmEventData{
peerID: peerID,
err: err,
},
}
bcR.eventsFromFSMCh <- msgData
}
// Implements bcRMessageInterface
func (bcR *BlockchainReactor) resetStateTimer(name string, timer **time.Timer, timeout time.Duration) {
if timer == nil {
panic("nil timer pointer parameter")
}
if *timer == nil {
*timer = time.AfterFunc(timeout, func() {
msg := bcReactorMessage{
event: stateTimeoutEv,
data: bReactorEventData{
stateName: name,
},
}
bcR.sendMessageToFSM(msg)
})
} else {
(*timer).Reset(timeout)
}
}
//-----------------------------------------------------------------------------
// Messages

View File

@ -78,7 +78,7 @@ const (
stateTimeoutEv
)
func (msg *bReactorMessageData) String() string {
func (msg *bcReactorMessage) String() string {
var dataStr string
switch msg.event {
@ -139,10 +139,13 @@ var (
finished *bReactorFSMState
)
// state timers
var (
// timeouts for state timers
waitForPeerTimeout = 2 * time.Second
waitForBlockAtCurrentHeightTimeout = 5 * time.Second
// overall timer for fast sync
fastSyncNoProgressTimeout = 5 * time.Second
)
// errors
@ -158,7 +161,7 @@ var (
errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added")
errSlowPeer = errors.New("peer is not sending us data fast enough")
errSwitchRemovesPeer = errors.New("switch is removing peer")
errTimeoutEventWrongState = errors.New("timeout event for a different state than the current one")
errTimeoutEventWrongState = errors.New("timeout event for a state different than the current one")
)
func init() {
@ -260,9 +263,9 @@ func init() {
} else {
fsm.pool.processedCurrentHeightBlock()
if fsm.pool.reachedMaxHeight() {
fsm.stop()
return finished, nil
}
// Since we advanced one block reset the state timer
fsm.resetStateTimer()
}
@ -270,6 +273,9 @@ func init() {
case peerRemoveEv:
// This event is sent by the switch to remove disconnected and errored peers.
if len(fsm.pool.peers) == 0 {
return waitForPeer, nil
}
fsm.pool.removePeer(data.peerId, data.err)
return waitForBlock, nil
@ -328,18 +334,18 @@ func (fsm *bReactorFSM) setLogger(l log.Logger) {
fsm.pool.setLogger(l)
}
// Starts the FSM goroutine.
// Starts the FSM.
func (fsm *bReactorFSM) start() {
_ = fsm.handle(&bReactorMessageData{event: startFSMEv})
_ = fsm.handle(&bcReactorMessage{event: startFSMEv})
}
// Stops the FSM goroutine.
// Stops the FSM.
func (fsm *bReactorFSM) stop() {
_ = fsm.handle(&bReactorMessageData{event: stopFSMEv})
_ = fsm.handle(&bcReactorMessage{event: stopFSMEv})
}
// handle processes messages and events sent to the FSM.
func (fsm *bReactorFSM) handle(msg *bReactorMessageData) error {
func (fsm *bReactorFSM) handle(msg *bcReactorMessage) error {
fsm.logger.Debug("FSM received", "event", msg, "state", fsm.state)
if fsm.state == nil {
@ -380,13 +386,14 @@ func (fsm *bReactorFSM) isCaughtUp() bool {
// Some conditions to determine if we're caught up.
// Ensures we've either received a block or waited some amount of time,
// and that we're synced to the highest known height.
// Note we use maxPeerHeight - 1 because to sync block H requires block H+1
// to verify the LastCommit.
receivedBlockOrTimedOut := fsm.pool.height > 0 || time.Since(fsm.startTime) > 5*time.Second
ourChainIsLongestAmongPeers := fsm.pool.maxPeerHeight == 0 || fsm.pool.height >= (fsm.pool.maxPeerHeight-1) || fsm.state == finished
// Note we use maxPeerHeight - 1 because to sync block H requires block H+1 to verify the LastCommit.
receivedBlockOrTimedOut := fsm.pool.height > 0 || time.Since(fsm.startTime) > fastSyncNoProgressTimeout
ourChainIsLongestAmongPeers := fsm.pool.maxPeerHeight == 0 || fsm.state == finished
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
return isCaughtUp
//return fsm.state == finished
}
func (fsm *bReactorFSM) makeNextRequests(maxNumPendingRequests int32) {

View File

@ -20,7 +20,7 @@ type testReactor struct {
}
func sendEventToFSM(fsm *bReactorFSM, ev bReactorEvent, data bReactorEventData) error {
return fsm.handle(&bReactorMessageData{event: ev, data: data})
return fsm.handle(&bcReactorMessage{event: ev, data: data})
}
var (
@ -567,7 +567,7 @@ func TestFSMBadBlockFromPeer(t *testing.T) {
}
}
func TestFSMBlockAtCurrentHeightNeverArrives(t *testing.T) {
func TestFSMBlockAtCurrentHeightDoesNotArriveInTime(t *testing.T) {
tests := []struct {
name string
startingHeight int64
@ -861,7 +861,7 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) {
steps []fsmStepTestValues
}{
{
name: "timeout event in state waitForPeer for state == waitForPeer",
name: "timeout event for state waitForPeer while in state waitForPeer",
startingHeight: 1,
maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{
@ -870,7 +870,7 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) {
},
},
{
name: "timeout event for state waitForPeer for state != waitForPeer",
name: "timeout event for state waitForPeer while in a state != waitForPeer",
startingHeight: 1,
maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{
@ -878,6 +878,40 @@ func TestFSMPeerStateTimeoutEvent(t *testing.T) {
makeStepStateTimeoutEv("waitForPeer", "waitForPeer", "waitForBlock", errTimeoutEventWrongState),
},
},
{
name: "timeout event for state waitForBlock while in state waitForBlock ",
startingHeight: 1,
maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{
makeStepStartFSMEv(),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests),
makeStepStateTimeoutEv("waitForBlock", "waitForPeer", "waitForBlock", errNoPeerResponse),
},
},
{
name: "timeout event for state waitForBlock while in a state != waitForBlock",
startingHeight: 1,
maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{
makeStepStartFSMEv(),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests),
makeStepStateTimeoutEv("waitForBlock", "waitForBlock", "waitForPeer", errTimeoutEventWrongState),
},
},
{
name: "timeout event for state waitForBlock with multiple peers",
startingHeight: 1,
maxRequestsPerPeer: 3,
steps: []fsmStepTestValues{
makeStepStartFSMEv(),
makeStepStatusEv("waitForPeer", "waitForBlock", "P1", 3, nil),
makeStepMakeRequestsEv("waitForBlock", "waitForBlock", maxNumPendingRequests),
makeStepStatusEv("waitForBlock", "waitForBlock", "P2", 3, nil),
makeStepStateTimeoutEv("waitForBlock", "waitForBlock", "waitForBlock", errNoPeerResponse),
},
},
}
for _, tt := range tests {
@ -1085,7 +1119,7 @@ func TestFSMPeerTimeout(t *testing.T) {
sendStatusResponse(fsm, peerID, 10)
time.Sleep(5 * time.Millisecond)
if err := fsm.handle(&bReactorMessageData{
if err := fsm.handle(&bcReactorMessage{
event: makeRequestsEv,
data: bReactorEventData{maxNumRequests: maxNumPendingRequests}}); err != nil {
}
@ -1151,7 +1185,7 @@ func (testR *testReactor) resetStateTimer(name string, timer **time.Timer, timeo
func sendStatusResponse(fsm *bReactorFSM, peerID p2p.ID, height int64) {
msgBytes := makeStatusResponseMessage(height)
_ = fsm.handle(&bReactorMessageData{
_ = fsm.handle(&bcReactorMessage{
event: statusResponseEv,
data: bReactorEventData{
peerId: peerID,