diff --git a/blockchain/v2/scheduler.go b/blockchain/v2/scheduler.go index 6ad58e5c..4edfa60c 100644 --- a/blockchain/v2/scheduler.go +++ b/blockchain/v2/scheduler.go @@ -7,17 +7,49 @@ import ( "time" "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/types" ) type height int64 +// errors +var ( + // internal to the package + errNoErrorFinished = errors.New("fast sync is finished") + errInvalidEvent = errors.New("invalid event in current state") + errMissingBlock = errors.New("missing blocks") + errNilPeerForBlockRequest = errors.New("peer for block request does not exist in the switch") + errSendQueueFull = errors.New("block request not made, send-queue is full") + errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added") + errSwitchRemovesPeer = errors.New("switch is removing peer") + errTimeoutEventWrongState = errors.New("timeout event for a state different than the current one") + errNoTallerPeer = errors.New("fast sync timed out on waiting for a peer taller than this node") + + // reported eventually to the switch + errPeerLowersItsHeight = errors.New("fast sync peer reports a height lower than previous") // handle return + errNoPeerResponseForCurrentHeights = errors.New("fast sync timed out on peer block response for current heights") // handle return + errNoPeerResponse = errors.New("fast sync timed out on peer block response") // xx + errBadDataFromPeer = errors.New("fast sync received block from wrong peer or block is bad") // xx + errDuplicateBlock = errors.New("fast sync received duplicate block from peer") + errBlockVerificationFailure = errors.New("fast sync block verification failure") // xx + errSlowPeer = errors.New("fast sync peer is not sending us data fast enough") // xx + +) + +type Event interface{} +type schedulerErrorEv struct { + peerID p2p.ID + error error +} + type blockState int const ( - blockStateNew = iota - blockStatePending, - blockStateReceived, - blockStateProcessed + blockStateUnknown = iota, + blockStateNew, + blockStatePending, + blockStateReceived, + blockStateProcessed ) type scBlockRequestMessage struct { @@ -52,9 +84,6 @@ type schedule struct { // a list of blocks in which blockState blockStates map[height]blockState - // a map of which blocks are available from which peers - blockPeers map[height]map[p2p.ID]scPeer - // a map of peerID to schedule specific peer struct `scPeer` peers map[p2p.ID]scPeer @@ -100,7 +129,6 @@ func (sc *schedule) removePeer(peerID p2p.ID) error { return errors.New("try to remove a removed peer") } - // cleanup blockStates & pending for height, pendingPeerID := range sc.pending { if peerID == pendingPeerID { delete(sc.pending[height]) @@ -113,7 +141,7 @@ func (sc *schedule) removePeer(peerID p2p.ID) error { return nil } -func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) { +func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error { if ok, peer := sc.peers[peerID]; !ok || peer.state != peerStateRemoved { return errors.New("new peer not found") } @@ -128,9 +156,32 @@ func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) { if !sc.blockState[i] { sc.blockState[i] = blockStateNew } - - sc.blockPeers[i][peerID] = peer } + + return nil +} + +func (sc *schedule) getStateAtHeight(height int64) blockState { + if ok, state := sc.blockState[height]; ok { + return state + } else { + return blockStateUnknown + } +} + +func (sc *schedule) getPeersAtHeight(height int64) []scPeer { + peers := []scPeer{} + for perrID, peer := range sc.peers { + if peer.height >= height { + peers = append(peers, peer) + } + } + + return peers +} + +func (sc *schedule) setStateAtHeight(height int64, state blockState) { + sc.blockStates[height] = state } func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64) error { @@ -138,7 +189,7 @@ func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64) error return errors.New("received a block from an unknown peer") } - if blockState[height] != blockStatePending { + if sc.getStateAtHeight(height) != blockStatePending { return errors.New("received an unrequested block") } @@ -149,18 +200,32 @@ func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64) error return errors.New("Didn't request this block from this peer") } - blockState[i] = blockStateReceived + sc.setStateAtHeight(height, blockStateReceived) delete(sc.pending[height]) return nil } +func (sc *schedule) markPending(peerID p2p.ID, height int64, size int64) error { + sc.setStateAtHeight(height, blockStatePending) + sc.pending[height] = peer +} + func (sc *schedule) markProcessed(height int64) error { - if sc.blockState[height] != received { + if sc.getStateAtHeight(height) != blockStateReceived { return errors.New("Block was processed without being received") } - sc.blockState[height] = blockStateProcessed + sc.setStateAtHeight(height, blockStateProcessed) +} + +func (sc *schedule) allBlocksProcessed() bool { + for height, state := range sc.blockStates { + if state != blockStateProcessed { + return false + } + } + return true } // heighter block | state == blockStateNew @@ -202,7 +267,7 @@ func (sc *schedule) resetBlocks(peerID p2p.ID) error { } func (sc *schedule) selectPeer(peers []scPeer) scPeer { - // TODO + // FIXME: properPeerSelector r.Intn(len(reasons)) s := rand.NewSource(time.Now().Unix()) r := rand.New(s) @@ -225,67 +290,121 @@ func (sc *schedule) numBlockInState(targetState blockState) uint32 { return num } -func (sc *schedule) popSchedule(maxRequest int) []scBlockRequestMessage { - // We only want to schedule requests such that we have less than sc.targetPending and sc.targetReceived - // This ensures we don't saturate the network or flood the processor with unprocessed blocks - pendingBlock := sc.numBlockInState(blockStatePending) - receivedBlocks := sc.numBlockInState(blockStateReceived) - todo := math.Min(sc.targetPending-pendingBlocks, sc.targetReceived-receivedBlocks) +type Scheduler struct { + sc schedule +} + +func NewScheduler(minHeight int64, targetPending uint, targetReceived uint) *Scheduler { + return &Scheduler{ + sc: newSchedule(minHeight, targetPending, targetReceived), + } +} + +func (sdr *Scheduler) handleAddPeer(peerID p2p.ID) []Event { + err := sdr.sc.addPeer(peerID) + if err != nil { + []Event{schedulerErrorEv{peerID, err}} + } + + return []Event{} +} + +func (sdr *Scheduler) handleRemovePeer(peerID p2p.ID) []Events { + err := sdr.sc.removePeer(peerID) + if err != nil { + []Event{schedulerErrorEv{peerID, err}} + } + + return []Event{} +} + +func (sdr *Scheduler) handleStatusResponse(peerID p2p.ID) []Event { + err := sdr.sc.touchPeer(peerID, time) + if err != nil { + return []Event{schedulerErrorEv{peerID, err}} + } + + err = sdr.sc.setPeerHeight(peerID, height) + if err != nil { + return []Event{schedulerErrorEv{peerID, err}} + } + + return []Event{} +} + +type bcBlockResponseEv struct { + peerID p2p.ID + height int64 + block *types.Block + msgSize int64 +} + +func (sdr *Scheduler) handleBlockResponse(peerID p2p.ID, msg *bcBlockResponse) []Event { + err := scr.sc.touchPeer(peerID, time) + if err != nil { + return []Event{schedulerErrorEv{peerID, err}} + } + + err = sdr.sc.markReceived(peerID, height, msg) + if err != nil { + return []Event{schedulerErrorEv{peerID, err}} + } + + return []Event{scBlockReceivedEv{peerID}} +} + +type scFinishedEv struct{} + +func (sdr *Scheduler) handleBlockProcessed(peerID p2p.ID, height int64) []Events { + err = sdr.sc.markProcessed(peerID, height) + if err != nil { + return []Event{schedulerErrorEv{peerID, err}} + } + + if sdr.sc.allBlocksProcessed() { + return []Event{scFinishedEv{}} + } + + return []Event{} +} + +func (sdr *Scheduler) handleBlockProcessError(peerID p2p.ID, height int64) []Event { + // remove the peer + sc.removePeer(peerID) + // reSchdule all the blocks we are waiting + /* + XXX: This is wrong as we need to + foreach block where state != blockStateProcessed + state => blockStateNew + */ + sc.resetBlocks(peerID) + + return []Event{} +} + +func (sdr *Scheduler) handleTimeCheck(peerID p2p.ID) []Events { + // prune peers + // TODO + + // produce new schedule events := []scBlockRequestMessage{} - for height := sc.minHeight(); height < sc.maxHeight(); height++ { + pendingBlock := sdr.sc.numBlockInState(blockStatePending) + receivedBlocks := sdr.sc.numBlockInState(blockStateReceived) + todo := math.Min(sdr.targetPending-pendingBlocks, sdr.targetReceived-receivedBlocks) + for height := sdr.sc.minHeight(); height <= sdr.sc.maxHeight(); height++ { if todo == 0 { break } - if blockStates[height] == blockStateNew { - peer = sc.selectPeer(blockPeers[i]) - sc.blockStates[height] = blockStatePending - sc.pending[height] = peer - events = append(events, scBlockRequestMessage{peerID: peer.peerID, height: i}) + if sdr.sc.getStateAt(height) == blockStateNew { + allPeers := sdr.sc.getPeersAtHeight(height) + bestPeer := sdr.sc.selectPeer(allPeers) + err := sc.markPending(peerID, height) + if err != nil { + // TODO + } + events = append(events, scBlockRequestMessage{peerID: bestPeer.peerID, height: height}) todo-- } } return events } - -type Scheduler struct { - sc schedule -} - -// What should the scheduler_test look like? - -/* -func addPeer(peerID) { - schedule.addPeer(peerID) -} - -func handleStatusResponse(peerID, height, time) { - schedule.touchPeer(peerID, time) - schedule.setPeerHeight(peerID, height) -} - -func handleBlockResponseMessage(peerID, height, block, time) { - schedule.touchPeer(peerID, time) - schedule.markReceived(peerID, height, size(block)) -} - -func handleNoBlockResponseMessage(peerID, height, time) { - schedule.touchPeer(peerID, time) - // reschedule that block, punish peer... -} - -func handleTimeCheckEv(time) { - // clean peer list - - events = [] - for peerID := range schedule.peersTouchedSince(time) { - pending = schedule.pendingFrom(peerID) - schedule.setPeerState(peerID, timedout) - schedule.resetBlocks(pending) - events = append(events, peerTimeout{peerID}) - } - - events = append(events, schedule.getSchedule()) - - return events -} -*/