diff --git a/blockchain/v2/scheduler.go b/blockchain/v2/scheduler.go index 39dc76fb..3767b15a 100644 --- a/blockchain/v2/scheduler.go +++ b/blockchain/v2/scheduler.go @@ -51,11 +51,11 @@ type schedulerErrorEv struct { type blockState int const ( - blockStateUnknown = iota, - blockStateNew, - blockStatePending, - blockStateReceived, - blockStateProcessed + blockStateUnknown = iota + blockStateNew + blockStatePending + blockStateReceived + blockStateProcessed ) type scBlockRequestMessage struct { @@ -67,71 +67,75 @@ type peerState int const ( peerStateNew = iota - peerStateReady, + peerStateReady peerStateRemoved ) type scPeer struct { peerID p2p.ID - state scPeerState + state peerState + height int64 lastTouched time.Time - monitor flow.Monitor + lastRate int64 } func newScPeer(peerID p2p.ID) *scPeer { return &scPeer{ peerID: peerID, state: peerStateNew, + height: -1, } } type schedule struct { - minHeight int64 + initHeight int64 // a list of blocks in which blockState - blockStates map[height]blockState + blockStates map[int64]blockState // a map of peerID to schedule specific peer struct `scPeer` - peers map[p2p.ID]scPeer + peers map[p2p.ID]*scPeer // a map of heights to the peer we are waiting for a response from - pending map[height]p2p.ID - - targetPending uint // the number of blocks we want in blockStatePending - targetReceived uint // the number of blocks we want in blockStateReceived + pending map[int64]p2p.ID + pendingTime map[int64]time.Time peerTimeout uint peerMinSpeed uint } -func newSchedule(minHeight int64, targetPending uint, targetReceived uint) *schedule { +func newSchedule(initHeight int64) *schedule { sc := schedule{ - minHeight: minHeight, - targetPending: targetPending, - targetReceived: targetReceived, + initHeight: initHeight, } - sc.setStateAtHeight(minHeight, blockStateNew) + sc.setStateAtHeight(initHeight, blockStateNew) return &sc } func (sc *schedule) addPeer(peerID p2p.ID) error { - if ok, _ := sc.peers[peerID]; ok { + if _, ok := sc.peers[peerID]; ok { return errDuplicatePeer } - peers[peerID] = newScPeer(peerID) + sc.peers[peerID] = newScPeer(peerID) + + return nil } func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error { - if ok, peer := sc.peers[peerID]; !ok && peer.state == peerStateRemoved { + var peer scPeer + if peer, ok := sc.peers[peerID]; !ok && peer.state == peerStateRemoved { return errPeerNotFound } peer.lastTouched = time + + return nil } func (sc *schedule) removePeer(peerID p2p.ID) error { - if ok, peer := sc.peers[peerID]; !ok { + var peer scPeer + if peer, ok := sc.peers[peerID]; !ok { return errPeerNotFound } @@ -141,7 +145,7 @@ func (sc *schedule) removePeer(peerID p2p.ID) error { for height, pendingPeerID := range sc.pending { if peerID == pendingPeerID { - delete(sc.pending[height]) + delete(sc.pending, height) sc.blockStates[height] = blockStateNew } } @@ -152,7 +156,8 @@ func (sc *schedule) removePeer(peerID p2p.ID) error { } func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error { - if ok, peer := sc.peers[peerID]; !ok || peer.state != peerStateRemoved { + var peer scPeer + if peer, ok := sc.peers[peerID]; !ok || peer.state != peerStateRemoved { return errors.New("new peer not found") } @@ -163,8 +168,8 @@ func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error { peer.height = height peer.state = peerStateReady for i := sc.minHeight(); i <= height; i++ { - if !sc.blockState[i] { - sc.blockState[i] = blockStateNew + if sc.getStateAtHeight(i) == blockStateUnknown { + sc.setStateAtHeight(i, blockStateNew) } } @@ -172,17 +177,17 @@ func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error { } func (sc *schedule) getStateAtHeight(height int64) blockState { - if height < sc.minHeight { + if height < sc.initHeight { return blockStateProcessed - } else if ok, state := sc.blockState[height]; ok { + } else if state, ok := sc.blockStates[height]; ok { return state } else { return blockStateUnknown } } -func (sc *schedule) getPeersAtHeight(height int64) []scPeer { - peers := []scPeer{} +func (sc *schedule) getPeersAtHeight(height int64) []*scPeer { + peers := []*scPeer{} for perrID, peer := range sc.peers { if peer.height >= height { peers = append(peers, peer) @@ -193,10 +198,10 @@ func (sc *schedule) getPeersAtHeight(height int64) []scPeer { } // XXX: probably needs a better name -func (sc *schedule) peersSince(duration time.Duration, now time.Time) []scPeer { - peers := []scPeer{} +func (sc *schedule) peersSince(duration time.Duration, now time.Time) []*scPeer { + peers := []*scPeer{} for id, peer := range sc.peers { - if now-peer.lastTouched > duration { + if now.Sub(peer.lastTouched) > duration { peers = append(peers, peer) } } @@ -208,8 +213,10 @@ func (sc *schedule) setStateAtHeight(height int64, state blockState) { sc.blockStates[height] = state } -func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64) error { - if ok, peer := sc.peers[peerID]; !ok || peer.state != peerStateReady { +// TODO keep track of when i received this block +func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error { + var peer scPeer + if peer, ok := sc.peers[peerID]; !ok || peer.state != peerStateReady { return errPeerNotFound } @@ -218,21 +225,29 @@ func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64) error return errBadSchedule } - // TODO: download speed - // check if the block is pending from that peer if sc.pending[height] != peerID { return errBadSchedule } + var pendingTime time.Time + if pendingTime, ok := sc.pendingTime[height]; !ok || pendingTime.Sub(now) < 0 { + return errBadSchedule + } + + peer.lastRate = size / int64(now.Sub(pendingTime).Seconds()) + sc.setStateAtHeight(height, blockStateReceived) - delete(sc.pending[height]) + delete(sc.pending, height) + delete(sc.pendingTime, height) return nil } -func (sc *schedule) markPending(peerID p2p.ID, height int64, size int64) error { - if ok, peer := sc.peers[peerID]; !ok || peer.state != peerStateReady { +// todo keep track of when i requested this block +func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) error { + var peer scPeer + if peer, ok := sc.peers[peerID]; !ok || peer.state != peerStateReady { return errPeerNotFound } @@ -242,7 +257,10 @@ func (sc *schedule) markPending(peerID p2p.ID, height int64, size int64) error { } sc.setStateAtHeight(height, blockStatePending) - sc.pending[height] = peer + sc.pending[height] = peerID + // XXX: to make htis more accurate we can introduce a message from + // the IO rutine which indicates the time the request was put on the wire + sc.pendingTime[height] = time return nil } @@ -253,6 +271,8 @@ func (sc *schedule) markProcessed(height int64) error { } sc.setStateAtHeight(height, blockStateProcessed) + + return nil } func (sc *schedule) allBlocksProcessed() bool { @@ -266,7 +286,7 @@ func (sc *schedule) allBlocksProcessed() bool { // heighter block | state == blockStateNew func (sc *schedule) maxHeight() int64 { - max := 0 + var max int64 = 0 for height, state := range sc.blockStates { if state == blockStateNew && height > max { max = height @@ -278,7 +298,7 @@ func (sc *schedule) maxHeight() int64 { // lowest block | state == blockStateNew func (sc *schedule) minHeight() int64 { - min := sc.minHeight + min := sc.initHeight for height, state := range sc.blockStates { if state == blockStateNew && height < min { min = height @@ -288,23 +308,36 @@ func (sc *schedule) minHeight() int64 { return min } -func (sc *schedule) pendingFrom(peerID p2p.ID) { - return sc.pending[peerID] +// XXX: THis is not yet used +func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 { + heights := []int64{} + for height, pendingPeerID := range sc.pending { + if pendingPeerID == peerID { + heights = append(heights, height) + } + } + return heights } +// XXX: Split up read and write paths here func (sc *schedule) resetBlocks(peerID p2p.ID) error { - if ok, peer := sc.peers[peerID]; !ok { + var peer scPeer + if peer, ok := sc.peers[peerID]; !ok { return errPeerNotFound } - for _, height := range sc.pending[peerID] { - sc.setStateAtHeight(height, blockStateNew) + // this should use pendingFrom + for height, pendingPeerID := range sc.pending { + if pendingPeerID == peerID { + sc.setStateAtHeight(height, blockStateNew) + } } + + return nil } func (sc *schedule) selectPeer(peers []scPeer) scPeer { // FIXME: properPeerSelector - r.Intn(len(reasons)) s := rand.NewSource(time.Now().Unix()) r := rand.New(s) @@ -317,8 +350,8 @@ func (sc *schedule) prunablePeers(time time.Time, minSpeed int) []p2p.ID { } func (sc *schedule) numBlockInState(targetState blockState) uint32 { - num := 0 - for height, state := range sc.blockState { + var num uint32 = 0 + for height, state := range sc.blockStates { if state == targetState { num++ } @@ -327,45 +360,53 @@ func (sc *schedule) numBlockInState(targetState blockState) uint32 { } type Scheduler struct { - sc schedule + sc *schedule + targetPending uint // the number of blocks we want in blockStatePending + targetReceived uint // the number of blocks we want in blockStateReceived + } func NewScheduler(minHeight int64, targetPending uint, targetReceived uint) *Scheduler { return &Scheduler{ - sc: newSchedule(minHeight, targetPending, targetReceived), + sc: newSchedule(minHeight), + targetPending: targetPending, + targetReceived: targetReceived, } } -func (sdr *Scheduler) handleAddPeer(peerID p2p.ID) []Event { +type Skip struct { +} + +func (sdr *Scheduler) handleAddPeer(peerID p2p.ID) Event { err := sdr.sc.addPeer(peerID) if err != nil { - []Event{schedulerErrorEv{peerID, err}} + return schedulerErrorEv{peerID, err} } - return []Event{} + return Skip{} } -func (sdr *Scheduler) handleRemovePeer(peerID p2p.ID) []Events { +func (sdr *Scheduler) handleRemovePeer(peerID p2p.ID) Event { err := sdr.sc.removePeer(peerID) if err != nil { - []Event{schedulerErrorEv{peerID, err}} + return schedulerErrorEv{peerID, err} } - return []Event{} + return Skip{} } -func (sdr *Scheduler) handleStatusResponse(peerID p2p.ID) []Event { - err := sdr.sc.touchPeer(peerID, time) +func (sdr *Scheduler) handleStatusResponse(peerID p2p.ID, height int64, now time.Time) Event { + err := sdr.sc.touchPeer(peerID, now) if err != nil { - return []Event{schedulerErrorEv{peerID, err}} + return schedulerErrorEv{peerID, err} } err = sdr.sc.setPeerHeight(peerID, height) if err != nil { - return []Event{schedulerErrorEv{peerID, err}} + return schedulerErrorEv{peerID, err} } - return []Event{} + return Skip{} } type bcBlockResponseEv struct { @@ -375,56 +416,60 @@ type bcBlockResponseEv struct { msgSize int64 } -func (sdr *Scheduler) handleBlockResponse(peerID p2p.ID, msg *bcBlockResponse) []Event { - err := scr.sc.touchPeer(peerID, time) +type scBlockReceivedEv struct { + peerID p2p.ID +} + +func (sdr *Scheduler) handleBlockResponse(peerID p2p.ID, msg *bcBlockResponseEv, now time.Time) Event { + err := sdr.sc.touchPeer(peerID, now) if err != nil { - return []Event{schedulerErrorEv{peerID, err}} + return schedulerErrorEv{peerID, err} } - err = sdr.sc.markReceived(peerID, height, msg) + err = sdr.sc.markReceived(peerID, msg.height, msg.msgSize, now) if err != nil { - return []Event{schedulerErrorEv{peerID, err}} + return schedulerErrorEv{peerID, err} } - return []Event{scBlockReceivedEv{peerID}} + return scBlockReceivedEv{peerID} } type scFinishedEv struct{} -func (sdr *Scheduler) handleBlockProcessed(peerID p2p.ID, height int64) []Events { - err = sdr.sc.markProcessed(peerID, height) +func (sdr *Scheduler) handleBlockProcessed(peerID p2p.ID, height int64) Event { + err := sdr.sc.markProcessed(height) if err != nil { - return []Event{schedulerErrorEv{peerID, err}} + return schedulerErrorEv{peerID, err} } if sdr.sc.allBlocksProcessed() { - return []Event{scFinishedEv{}} + return scFinishedEv{} } - return []Event{} + return Skip{} } -func (sdr *Scheduler) handleBlockProcessError(peerID p2p.ID, height int64) []Event { +func (sdr *Scheduler) handleBlockProcessError(peerID p2p.ID, height int64) Event { // remove the peer - sc.removePeer(peerID) + sdr.sc.removePeer(peerID) // reSchdule all the blocks we are waiting /* XXX: This is wrong as we need to foreach block where state != blockStateProcessed state => blockStateNew */ - sc.resetBlocks(peerID) + sdr.sc.resetBlocks(peerID) - return []Event{} + return Skip{} } -func (sdr *Scheduler) handleTimeCheck(peerID p2p.ID) []Events { +func (sdr *Scheduler) handleTimeCheck(peerID p2p.ID) []Event { // prune peers // TODO // produce new schedule events := []scBlockRequestMessage{} - pendingBlock := sdr.sc.numBlockInState(blockStatePending) + pendingBlocks := sdr.sc.numBlockInState(blockStatePending) receivedBlocks := sdr.sc.numBlockInState(blockStateReceived) todo := math.Min(sdr.targetPending-pendingBlocks, sdr.targetReceived-receivedBlocks) for height := sdr.sc.minHeight(); height <= sdr.sc.maxHeight(); height++ {