WIP: progress on converting pseudocode

This commit is contained in:
Sean Braithwaite 2019-07-14 23:50:58 +03:00
parent 1b95954b88
commit c91ab0c7b0

View File

@ -51,11 +51,11 @@ type schedulerErrorEv struct {
type blockState int
const (
blockStateUnknown = iota,
blockStateNew,
blockStatePending,
blockStateReceived,
blockStateProcessed
blockStateUnknown = iota
blockStateNew
blockStatePending
blockStateReceived
blockStateProcessed
)
type scBlockRequestMessage struct {
@ -67,71 +67,75 @@ type peerState int
const (
peerStateNew = iota
peerStateReady,
peerStateReady
peerStateRemoved
)
type scPeer struct {
peerID p2p.ID
state scPeerState
state peerState
height int64
lastTouched time.Time
monitor flow.Monitor
lastRate int64
}
func newScPeer(peerID p2p.ID) *scPeer {
return &scPeer{
peerID: peerID,
state: peerStateNew,
height: -1,
}
}
type schedule struct {
minHeight int64
initHeight int64
// a list of blocks in which blockState
blockStates map[height]blockState
blockStates map[int64]blockState
// a map of peerID to schedule specific peer struct `scPeer`
peers map[p2p.ID]scPeer
peers map[p2p.ID]*scPeer
// a map of heights to the peer we are waiting for a response from
pending map[height]p2p.ID
targetPending uint // the number of blocks we want in blockStatePending
targetReceived uint // the number of blocks we want in blockStateReceived
pending map[int64]p2p.ID
pendingTime map[int64]time.Time
peerTimeout uint
peerMinSpeed uint
}
func newSchedule(minHeight int64, targetPending uint, targetReceived uint) *schedule {
func newSchedule(initHeight int64) *schedule {
sc := schedule{
minHeight: minHeight,
targetPending: targetPending,
targetReceived: targetReceived,
initHeight: initHeight,
}
sc.setStateAtHeight(minHeight, blockStateNew)
sc.setStateAtHeight(initHeight, blockStateNew)
return &sc
}
func (sc *schedule) addPeer(peerID p2p.ID) error {
if ok, _ := sc.peers[peerID]; ok {
if _, ok := sc.peers[peerID]; ok {
return errDuplicatePeer
}
peers[peerID] = newScPeer(peerID)
sc.peers[peerID] = newScPeer(peerID)
return nil
}
func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error {
if ok, peer := sc.peers[peerID]; !ok && peer.state == peerStateRemoved {
var peer scPeer
if peer, ok := sc.peers[peerID]; !ok && peer.state == peerStateRemoved {
return errPeerNotFound
}
peer.lastTouched = time
return nil
}
func (sc *schedule) removePeer(peerID p2p.ID) error {
if ok, peer := sc.peers[peerID]; !ok {
var peer scPeer
if peer, ok := sc.peers[peerID]; !ok {
return errPeerNotFound
}
@ -141,7 +145,7 @@ func (sc *schedule) removePeer(peerID p2p.ID) error {
for height, pendingPeerID := range sc.pending {
if peerID == pendingPeerID {
delete(sc.pending[height])
delete(sc.pending, height)
sc.blockStates[height] = blockStateNew
}
}
@ -152,7 +156,8 @@ func (sc *schedule) removePeer(peerID p2p.ID) error {
}
func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error {
if ok, peer := sc.peers[peerID]; !ok || peer.state != peerStateRemoved {
var peer scPeer
if peer, ok := sc.peers[peerID]; !ok || peer.state != peerStateRemoved {
return errors.New("new peer not found")
}
@ -163,8 +168,8 @@ func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error {
peer.height = height
peer.state = peerStateReady
for i := sc.minHeight(); i <= height; i++ {
if !sc.blockState[i] {
sc.blockState[i] = blockStateNew
if sc.getStateAtHeight(i) == blockStateUnknown {
sc.setStateAtHeight(i, blockStateNew)
}
}
@ -172,17 +177,17 @@ func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error {
}
func (sc *schedule) getStateAtHeight(height int64) blockState {
if height < sc.minHeight {
if height < sc.initHeight {
return blockStateProcessed
} else if ok, state := sc.blockState[height]; ok {
} else if state, ok := sc.blockStates[height]; ok {
return state
} else {
return blockStateUnknown
}
}
func (sc *schedule) getPeersAtHeight(height int64) []scPeer {
peers := []scPeer{}
func (sc *schedule) getPeersAtHeight(height int64) []*scPeer {
peers := []*scPeer{}
for perrID, peer := range sc.peers {
if peer.height >= height {
peers = append(peers, peer)
@ -193,10 +198,10 @@ func (sc *schedule) getPeersAtHeight(height int64) []scPeer {
}
// XXX: probably needs a better name
func (sc *schedule) peersSince(duration time.Duration, now time.Time) []scPeer {
peers := []scPeer{}
func (sc *schedule) peersSince(duration time.Duration, now time.Time) []*scPeer {
peers := []*scPeer{}
for id, peer := range sc.peers {
if now-peer.lastTouched > duration {
if now.Sub(peer.lastTouched) > duration {
peers = append(peers, peer)
}
}
@ -208,8 +213,10 @@ func (sc *schedule) setStateAtHeight(height int64, state blockState) {
sc.blockStates[height] = state
}
func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64) error {
if ok, peer := sc.peers[peerID]; !ok || peer.state != peerStateReady {
// TODO keep track of when i received this block
func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error {
var peer scPeer
if peer, ok := sc.peers[peerID]; !ok || peer.state != peerStateReady {
return errPeerNotFound
}
@ -218,21 +225,29 @@ func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64) error
return errBadSchedule
}
// TODO: download speed
// check if the block is pending from that peer
if sc.pending[height] != peerID {
return errBadSchedule
}
var pendingTime time.Time
if pendingTime, ok := sc.pendingTime[height]; !ok || pendingTime.Sub(now) < 0 {
return errBadSchedule
}
peer.lastRate = size / int64(now.Sub(pendingTime).Seconds())
sc.setStateAtHeight(height, blockStateReceived)
delete(sc.pending[height])
delete(sc.pending, height)
delete(sc.pendingTime, height)
return nil
}
func (sc *schedule) markPending(peerID p2p.ID, height int64, size int64) error {
if ok, peer := sc.peers[peerID]; !ok || peer.state != peerStateReady {
// todo keep track of when i requested this block
func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) error {
var peer scPeer
if peer, ok := sc.peers[peerID]; !ok || peer.state != peerStateReady {
return errPeerNotFound
}
@ -242,7 +257,10 @@ func (sc *schedule) markPending(peerID p2p.ID, height int64, size int64) error {
}
sc.setStateAtHeight(height, blockStatePending)
sc.pending[height] = peer
sc.pending[height] = peerID
// XXX: to make htis more accurate we can introduce a message from
// the IO rutine which indicates the time the request was put on the wire
sc.pendingTime[height] = time
return nil
}
@ -253,6 +271,8 @@ func (sc *schedule) markProcessed(height int64) error {
}
sc.setStateAtHeight(height, blockStateProcessed)
return nil
}
func (sc *schedule) allBlocksProcessed() bool {
@ -266,7 +286,7 @@ func (sc *schedule) allBlocksProcessed() bool {
// heighter block | state == blockStateNew
func (sc *schedule) maxHeight() int64 {
max := 0
var max int64 = 0
for height, state := range sc.blockStates {
if state == blockStateNew && height > max {
max = height
@ -278,7 +298,7 @@ func (sc *schedule) maxHeight() int64 {
// lowest block | state == blockStateNew
func (sc *schedule) minHeight() int64 {
min := sc.minHeight
min := sc.initHeight
for height, state := range sc.blockStates {
if state == blockStateNew && height < min {
min = height
@ -288,23 +308,36 @@ func (sc *schedule) minHeight() int64 {
return min
}
func (sc *schedule) pendingFrom(peerID p2p.ID) {
return sc.pending[peerID]
// XXX: THis is not yet used
func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 {
heights := []int64{}
for height, pendingPeerID := range sc.pending {
if pendingPeerID == peerID {
heights = append(heights, height)
}
}
return heights
}
// XXX: Split up read and write paths here
func (sc *schedule) resetBlocks(peerID p2p.ID) error {
if ok, peer := sc.peers[peerID]; !ok {
var peer scPeer
if peer, ok := sc.peers[peerID]; !ok {
return errPeerNotFound
}
for _, height := range sc.pending[peerID] {
sc.setStateAtHeight(height, blockStateNew)
// this should use pendingFrom
for height, pendingPeerID := range sc.pending {
if pendingPeerID == peerID {
sc.setStateAtHeight(height, blockStateNew)
}
}
return nil
}
func (sc *schedule) selectPeer(peers []scPeer) scPeer {
// FIXME: properPeerSelector
r.Intn(len(reasons))
s := rand.NewSource(time.Now().Unix())
r := rand.New(s)
@ -317,8 +350,8 @@ func (sc *schedule) prunablePeers(time time.Time, minSpeed int) []p2p.ID {
}
func (sc *schedule) numBlockInState(targetState blockState) uint32 {
num := 0
for height, state := range sc.blockState {
var num uint32 = 0
for height, state := range sc.blockStates {
if state == targetState {
num++
}
@ -327,45 +360,53 @@ func (sc *schedule) numBlockInState(targetState blockState) uint32 {
}
type Scheduler struct {
sc schedule
sc *schedule
targetPending uint // the number of blocks we want in blockStatePending
targetReceived uint // the number of blocks we want in blockStateReceived
}
func NewScheduler(minHeight int64, targetPending uint, targetReceived uint) *Scheduler {
return &Scheduler{
sc: newSchedule(minHeight, targetPending, targetReceived),
sc: newSchedule(minHeight),
targetPending: targetPending,
targetReceived: targetReceived,
}
}
func (sdr *Scheduler) handleAddPeer(peerID p2p.ID) []Event {
type Skip struct {
}
func (sdr *Scheduler) handleAddPeer(peerID p2p.ID) Event {
err := sdr.sc.addPeer(peerID)
if err != nil {
[]Event{schedulerErrorEv{peerID, err}}
return schedulerErrorEv{peerID, err}
}
return []Event{}
return Skip{}
}
func (sdr *Scheduler) handleRemovePeer(peerID p2p.ID) []Events {
func (sdr *Scheduler) handleRemovePeer(peerID p2p.ID) Event {
err := sdr.sc.removePeer(peerID)
if err != nil {
[]Event{schedulerErrorEv{peerID, err}}
return schedulerErrorEv{peerID, err}
}
return []Event{}
return Skip{}
}
func (sdr *Scheduler) handleStatusResponse(peerID p2p.ID) []Event {
err := sdr.sc.touchPeer(peerID, time)
func (sdr *Scheduler) handleStatusResponse(peerID p2p.ID, height int64, now time.Time) Event {
err := sdr.sc.touchPeer(peerID, now)
if err != nil {
return []Event{schedulerErrorEv{peerID, err}}
return schedulerErrorEv{peerID, err}
}
err = sdr.sc.setPeerHeight(peerID, height)
if err != nil {
return []Event{schedulerErrorEv{peerID, err}}
return schedulerErrorEv{peerID, err}
}
return []Event{}
return Skip{}
}
type bcBlockResponseEv struct {
@ -375,56 +416,60 @@ type bcBlockResponseEv struct {
msgSize int64
}
func (sdr *Scheduler) handleBlockResponse(peerID p2p.ID, msg *bcBlockResponse) []Event {
err := scr.sc.touchPeer(peerID, time)
type scBlockReceivedEv struct {
peerID p2p.ID
}
func (sdr *Scheduler) handleBlockResponse(peerID p2p.ID, msg *bcBlockResponseEv, now time.Time) Event {
err := sdr.sc.touchPeer(peerID, now)
if err != nil {
return []Event{schedulerErrorEv{peerID, err}}
return schedulerErrorEv{peerID, err}
}
err = sdr.sc.markReceived(peerID, height, msg)
err = sdr.sc.markReceived(peerID, msg.height, msg.msgSize, now)
if err != nil {
return []Event{schedulerErrorEv{peerID, err}}
return schedulerErrorEv{peerID, err}
}
return []Event{scBlockReceivedEv{peerID}}
return scBlockReceivedEv{peerID}
}
type scFinishedEv struct{}
func (sdr *Scheduler) handleBlockProcessed(peerID p2p.ID, height int64) []Events {
err = sdr.sc.markProcessed(peerID, height)
func (sdr *Scheduler) handleBlockProcessed(peerID p2p.ID, height int64) Event {
err := sdr.sc.markProcessed(height)
if err != nil {
return []Event{schedulerErrorEv{peerID, err}}
return schedulerErrorEv{peerID, err}
}
if sdr.sc.allBlocksProcessed() {
return []Event{scFinishedEv{}}
return scFinishedEv{}
}
return []Event{}
return Skip{}
}
func (sdr *Scheduler) handleBlockProcessError(peerID p2p.ID, height int64) []Event {
func (sdr *Scheduler) handleBlockProcessError(peerID p2p.ID, height int64) Event {
// remove the peer
sc.removePeer(peerID)
sdr.sc.removePeer(peerID)
// reSchdule all the blocks we are waiting
/*
XXX: This is wrong as we need to
foreach block where state != blockStateProcessed
state => blockStateNew
*/
sc.resetBlocks(peerID)
sdr.sc.resetBlocks(peerID)
return []Event{}
return Skip{}
}
func (sdr *Scheduler) handleTimeCheck(peerID p2p.ID) []Events {
func (sdr *Scheduler) handleTimeCheck(peerID p2p.ID) []Event {
// prune peers
// TODO
// produce new schedule
events := []scBlockRequestMessage{}
pendingBlock := sdr.sc.numBlockInState(blockStatePending)
pendingBlocks := sdr.sc.numBlockInState(blockStatePending)
receivedBlocks := sdr.sc.numBlockInState(blockStateReceived)
todo := math.Min(sdr.targetPending-pendingBlocks, sdr.targetReceived-receivedBlocks)
for height := sdr.sc.minHeight(); height <= sdr.sc.maxHeight(); height++ {