mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-28 04:01:40 +00:00
flesh out scheduler
This commit is contained in:
@ -7,17 +7,49 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/tendermint/tendermint/p2p"
|
"github.com/tendermint/tendermint/p2p"
|
||||||
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type height int64
|
type height int64
|
||||||
|
|
||||||
|
// errors
|
||||||
|
var (
|
||||||
|
// internal to the package
|
||||||
|
errNoErrorFinished = errors.New("fast sync is finished")
|
||||||
|
errInvalidEvent = errors.New("invalid event in current state")
|
||||||
|
errMissingBlock = errors.New("missing blocks")
|
||||||
|
errNilPeerForBlockRequest = errors.New("peer for block request does not exist in the switch")
|
||||||
|
errSendQueueFull = errors.New("block request not made, send-queue is full")
|
||||||
|
errPeerTooShort = errors.New("peer height too low, old peer removed/ new peer not added")
|
||||||
|
errSwitchRemovesPeer = errors.New("switch is removing peer")
|
||||||
|
errTimeoutEventWrongState = errors.New("timeout event for a state different than the current one")
|
||||||
|
errNoTallerPeer = errors.New("fast sync timed out on waiting for a peer taller than this node")
|
||||||
|
|
||||||
|
// reported eventually to the switch
|
||||||
|
errPeerLowersItsHeight = errors.New("fast sync peer reports a height lower than previous") // handle return
|
||||||
|
errNoPeerResponseForCurrentHeights = errors.New("fast sync timed out on peer block response for current heights") // handle return
|
||||||
|
errNoPeerResponse = errors.New("fast sync timed out on peer block response") // xx
|
||||||
|
errBadDataFromPeer = errors.New("fast sync received block from wrong peer or block is bad") // xx
|
||||||
|
errDuplicateBlock = errors.New("fast sync received duplicate block from peer")
|
||||||
|
errBlockVerificationFailure = errors.New("fast sync block verification failure") // xx
|
||||||
|
errSlowPeer = errors.New("fast sync peer is not sending us data fast enough") // xx
|
||||||
|
|
||||||
|
)
|
||||||
|
|
||||||
|
type Event interface{}
|
||||||
|
type schedulerErrorEv struct {
|
||||||
|
peerID p2p.ID
|
||||||
|
error error
|
||||||
|
}
|
||||||
|
|
||||||
type blockState int
|
type blockState int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
blockStateNew = iota
|
blockStateUnknown = iota,
|
||||||
blockStatePending,
|
blockStateNew,
|
||||||
blockStateReceived,
|
blockStatePending,
|
||||||
blockStateProcessed
|
blockStateReceived,
|
||||||
|
blockStateProcessed
|
||||||
)
|
)
|
||||||
|
|
||||||
type scBlockRequestMessage struct {
|
type scBlockRequestMessage struct {
|
||||||
@ -52,9 +84,6 @@ type schedule struct {
|
|||||||
// a list of blocks in which blockState
|
// a list of blocks in which blockState
|
||||||
blockStates map[height]blockState
|
blockStates map[height]blockState
|
||||||
|
|
||||||
// a map of which blocks are available from which peers
|
|
||||||
blockPeers map[height]map[p2p.ID]scPeer
|
|
||||||
|
|
||||||
// a map of peerID to schedule specific peer struct `scPeer`
|
// a map of peerID to schedule specific peer struct `scPeer`
|
||||||
peers map[p2p.ID]scPeer
|
peers map[p2p.ID]scPeer
|
||||||
|
|
||||||
@ -100,7 +129,6 @@ func (sc *schedule) removePeer(peerID p2p.ID) error {
|
|||||||
return errors.New("try to remove a removed peer")
|
return errors.New("try to remove a removed peer")
|
||||||
}
|
}
|
||||||
|
|
||||||
// cleanup blockStates & pending
|
|
||||||
for height, pendingPeerID := range sc.pending {
|
for height, pendingPeerID := range sc.pending {
|
||||||
if peerID == pendingPeerID {
|
if peerID == pendingPeerID {
|
||||||
delete(sc.pending[height])
|
delete(sc.pending[height])
|
||||||
@ -113,7 +141,7 @@ func (sc *schedule) removePeer(peerID p2p.ID) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) {
|
func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error {
|
||||||
if ok, peer := sc.peers[peerID]; !ok || peer.state != peerStateRemoved {
|
if ok, peer := sc.peers[peerID]; !ok || peer.state != peerStateRemoved {
|
||||||
return errors.New("new peer not found")
|
return errors.New("new peer not found")
|
||||||
}
|
}
|
||||||
@ -128,9 +156,32 @@ func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) {
|
|||||||
if !sc.blockState[i] {
|
if !sc.blockState[i] {
|
||||||
sc.blockState[i] = blockStateNew
|
sc.blockState[i] = blockStateNew
|
||||||
}
|
}
|
||||||
|
|
||||||
sc.blockPeers[i][peerID] = peer
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *schedule) getStateAtHeight(height int64) blockState {
|
||||||
|
if ok, state := sc.blockState[height]; ok {
|
||||||
|
return state
|
||||||
|
} else {
|
||||||
|
return blockStateUnknown
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *schedule) getPeersAtHeight(height int64) []scPeer {
|
||||||
|
peers := []scPeer{}
|
||||||
|
for perrID, peer := range sc.peers {
|
||||||
|
if peer.height >= height {
|
||||||
|
peers = append(peers, peer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return peers
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *schedule) setStateAtHeight(height int64, state blockState) {
|
||||||
|
sc.blockStates[height] = state
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64) error {
|
func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64) error {
|
||||||
@ -138,7 +189,7 @@ func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64) error
|
|||||||
return errors.New("received a block from an unknown peer")
|
return errors.New("received a block from an unknown peer")
|
||||||
}
|
}
|
||||||
|
|
||||||
if blockState[height] != blockStatePending {
|
if sc.getStateAtHeight(height) != blockStatePending {
|
||||||
return errors.New("received an unrequested block")
|
return errors.New("received an unrequested block")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -149,18 +200,32 @@ func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64) error
|
|||||||
return errors.New("Didn't request this block from this peer")
|
return errors.New("Didn't request this block from this peer")
|
||||||
}
|
}
|
||||||
|
|
||||||
blockState[i] = blockStateReceived
|
sc.setStateAtHeight(height, blockStateReceived)
|
||||||
delete(sc.pending[height])
|
delete(sc.pending[height])
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sc *schedule) markPending(peerID p2p.ID, height int64, size int64) error {
|
||||||
|
sc.setStateAtHeight(height, blockStatePending)
|
||||||
|
sc.pending[height] = peer
|
||||||
|
}
|
||||||
|
|
||||||
func (sc *schedule) markProcessed(height int64) error {
|
func (sc *schedule) markProcessed(height int64) error {
|
||||||
if sc.blockState[height] != received {
|
if sc.getStateAtHeight(height) != blockStateReceived {
|
||||||
return errors.New("Block was processed without being received")
|
return errors.New("Block was processed without being received")
|
||||||
}
|
}
|
||||||
|
|
||||||
sc.blockState[height] = blockStateProcessed
|
sc.setStateAtHeight(height, blockStateProcessed)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sc *schedule) allBlocksProcessed() bool {
|
||||||
|
for height, state := range sc.blockStates {
|
||||||
|
if state != blockStateProcessed {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// heighter block | state == blockStateNew
|
// heighter block | state == blockStateNew
|
||||||
@ -202,7 +267,7 @@ func (sc *schedule) resetBlocks(peerID p2p.ID) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sc *schedule) selectPeer(peers []scPeer) scPeer {
|
func (sc *schedule) selectPeer(peers []scPeer) scPeer {
|
||||||
// TODO
|
// FIXME: properPeerSelector
|
||||||
r.Intn(len(reasons))
|
r.Intn(len(reasons))
|
||||||
s := rand.NewSource(time.Now().Unix())
|
s := rand.NewSource(time.Now().Unix())
|
||||||
r := rand.New(s)
|
r := rand.New(s)
|
||||||
@ -225,67 +290,121 @@ func (sc *schedule) numBlockInState(targetState blockState) uint32 {
|
|||||||
return num
|
return num
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *schedule) popSchedule(maxRequest int) []scBlockRequestMessage {
|
type Scheduler struct {
|
||||||
// We only want to schedule requests such that we have less than sc.targetPending and sc.targetReceived
|
sc schedule
|
||||||
// This ensures we don't saturate the network or flood the processor with unprocessed blocks
|
}
|
||||||
pendingBlock := sc.numBlockInState(blockStatePending)
|
|
||||||
receivedBlocks := sc.numBlockInState(blockStateReceived)
|
func NewScheduler(minHeight int64, targetPending uint, targetReceived uint) *Scheduler {
|
||||||
todo := math.Min(sc.targetPending-pendingBlocks, sc.targetReceived-receivedBlocks)
|
return &Scheduler{
|
||||||
|
sc: newSchedule(minHeight, targetPending, targetReceived),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sdr *Scheduler) handleAddPeer(peerID p2p.ID) []Event {
|
||||||
|
err := sdr.sc.addPeer(peerID)
|
||||||
|
if err != nil {
|
||||||
|
[]Event{schedulerErrorEv{peerID, err}}
|
||||||
|
}
|
||||||
|
|
||||||
|
return []Event{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sdr *Scheduler) handleRemovePeer(peerID p2p.ID) []Events {
|
||||||
|
err := sdr.sc.removePeer(peerID)
|
||||||
|
if err != nil {
|
||||||
|
[]Event{schedulerErrorEv{peerID, err}}
|
||||||
|
}
|
||||||
|
|
||||||
|
return []Event{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sdr *Scheduler) handleStatusResponse(peerID p2p.ID) []Event {
|
||||||
|
err := sdr.sc.touchPeer(peerID, time)
|
||||||
|
if err != nil {
|
||||||
|
return []Event{schedulerErrorEv{peerID, err}}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = sdr.sc.setPeerHeight(peerID, height)
|
||||||
|
if err != nil {
|
||||||
|
return []Event{schedulerErrorEv{peerID, err}}
|
||||||
|
}
|
||||||
|
|
||||||
|
return []Event{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type bcBlockResponseEv struct {
|
||||||
|
peerID p2p.ID
|
||||||
|
height int64
|
||||||
|
block *types.Block
|
||||||
|
msgSize int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sdr *Scheduler) handleBlockResponse(peerID p2p.ID, msg *bcBlockResponse) []Event {
|
||||||
|
err := scr.sc.touchPeer(peerID, time)
|
||||||
|
if err != nil {
|
||||||
|
return []Event{schedulerErrorEv{peerID, err}}
|
||||||
|
}
|
||||||
|
|
||||||
|
err = sdr.sc.markReceived(peerID, height, msg)
|
||||||
|
if err != nil {
|
||||||
|
return []Event{schedulerErrorEv{peerID, err}}
|
||||||
|
}
|
||||||
|
|
||||||
|
return []Event{scBlockReceivedEv{peerID}}
|
||||||
|
}
|
||||||
|
|
||||||
|
type scFinishedEv struct{}
|
||||||
|
|
||||||
|
func (sdr *Scheduler) handleBlockProcessed(peerID p2p.ID, height int64) []Events {
|
||||||
|
err = sdr.sc.markProcessed(peerID, height)
|
||||||
|
if err != nil {
|
||||||
|
return []Event{schedulerErrorEv{peerID, err}}
|
||||||
|
}
|
||||||
|
|
||||||
|
if sdr.sc.allBlocksProcessed() {
|
||||||
|
return []Event{scFinishedEv{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
return []Event{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sdr *Scheduler) handleBlockProcessError(peerID p2p.ID, height int64) []Event {
|
||||||
|
// remove the peer
|
||||||
|
sc.removePeer(peerID)
|
||||||
|
// reSchdule all the blocks we are waiting
|
||||||
|
/*
|
||||||
|
XXX: This is wrong as we need to
|
||||||
|
foreach block where state != blockStateProcessed
|
||||||
|
state => blockStateNew
|
||||||
|
*/
|
||||||
|
sc.resetBlocks(peerID)
|
||||||
|
|
||||||
|
return []Event{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sdr *Scheduler) handleTimeCheck(peerID p2p.ID) []Events {
|
||||||
|
// prune peers
|
||||||
|
// TODO
|
||||||
|
|
||||||
|
// produce new schedule
|
||||||
events := []scBlockRequestMessage{}
|
events := []scBlockRequestMessage{}
|
||||||
for height := sc.minHeight(); height < sc.maxHeight(); height++ {
|
pendingBlock := sdr.sc.numBlockInState(blockStatePending)
|
||||||
|
receivedBlocks := sdr.sc.numBlockInState(blockStateReceived)
|
||||||
|
todo := math.Min(sdr.targetPending-pendingBlocks, sdr.targetReceived-receivedBlocks)
|
||||||
|
for height := sdr.sc.minHeight(); height <= sdr.sc.maxHeight(); height++ {
|
||||||
if todo == 0 {
|
if todo == 0 {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if blockStates[height] == blockStateNew {
|
if sdr.sc.getStateAt(height) == blockStateNew {
|
||||||
peer = sc.selectPeer(blockPeers[i])
|
allPeers := sdr.sc.getPeersAtHeight(height)
|
||||||
sc.blockStates[height] = blockStatePending
|
bestPeer := sdr.sc.selectPeer(allPeers)
|
||||||
sc.pending[height] = peer
|
err := sc.markPending(peerID, height)
|
||||||
events = append(events, scBlockRequestMessage{peerID: peer.peerID, height: i})
|
if err != nil {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
events = append(events, scBlockRequestMessage{peerID: bestPeer.peerID, height: height})
|
||||||
todo--
|
todo--
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return events
|
return events
|
||||||
}
|
}
|
||||||
|
|
||||||
type Scheduler struct {
|
|
||||||
sc schedule
|
|
||||||
}
|
|
||||||
|
|
||||||
// What should the scheduler_test look like?
|
|
||||||
|
|
||||||
/*
|
|
||||||
func addPeer(peerID) {
|
|
||||||
schedule.addPeer(peerID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleStatusResponse(peerID, height, time) {
|
|
||||||
schedule.touchPeer(peerID, time)
|
|
||||||
schedule.setPeerHeight(peerID, height)
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleBlockResponseMessage(peerID, height, block, time) {
|
|
||||||
schedule.touchPeer(peerID, time)
|
|
||||||
schedule.markReceived(peerID, height, size(block))
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleNoBlockResponseMessage(peerID, height, time) {
|
|
||||||
schedule.touchPeer(peerID, time)
|
|
||||||
// reschedule that block, punish peer...
|
|
||||||
}
|
|
||||||
|
|
||||||
func handleTimeCheckEv(time) {
|
|
||||||
// clean peer list
|
|
||||||
|
|
||||||
events = []
|
|
||||||
for peerID := range schedule.peersTouchedSince(time) {
|
|
||||||
pending = schedule.pendingFrom(peerID)
|
|
||||||
schedule.setPeerState(peerID, timedout)
|
|
||||||
schedule.resetBlocks(pending)
|
|
||||||
events = append(events, peerTimeout{peerID})
|
|
||||||
}
|
|
||||||
|
|
||||||
events = append(events, schedule.getSchedule())
|
|
||||||
|
|
||||||
return events
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
Reference in New Issue
Block a user