mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-29 20:51:45 +00:00
all tests pass
This commit is contained in:
@ -2,6 +2,7 @@ package v2
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"time"
|
"time"
|
||||||
@ -51,13 +52,31 @@ type schedulerErrorEv struct {
|
|||||||
type blockState int
|
type blockState int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
blockStateUnknown = iota
|
blockStateUnknown blockState = iota
|
||||||
blockStateNew
|
blockStateNew
|
||||||
blockStatePending
|
blockStatePending
|
||||||
blockStateReceived
|
blockStateReceived
|
||||||
blockStateProcessed
|
blockStateProcessed
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (e blockState) String() string {
|
||||||
|
switch e {
|
||||||
|
case blockStateUnknown:
|
||||||
|
return "Unknown"
|
||||||
|
case blockStateNew:
|
||||||
|
return "New"
|
||||||
|
case blockStatePending:
|
||||||
|
return "Pending"
|
||||||
|
case blockStateReceived:
|
||||||
|
return "Received"
|
||||||
|
case blockStateProcessed:
|
||||||
|
return "Processed"
|
||||||
|
default:
|
||||||
|
// XXX: panic?
|
||||||
|
return fmt.Sprintf("default %d", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type scBlockRequestMessage struct {
|
type scBlockRequestMessage struct {
|
||||||
peerID p2p.ID
|
peerID p2p.ID
|
||||||
height int64
|
height int64
|
||||||
@ -97,9 +116,11 @@ type schedule struct {
|
|||||||
peers map[p2p.ID]*scPeer
|
peers map[p2p.ID]*scPeer
|
||||||
|
|
||||||
// a map of heights to the peer we are waiting for a response from
|
// a map of heights to the peer we are waiting for a response from
|
||||||
pending map[int64]p2p.ID
|
pendingBlocks map[int64]p2p.ID
|
||||||
pendingTime map[int64]time.Time
|
pendingTime map[int64]time.Time
|
||||||
|
|
||||||
|
receivedBlocks map[int64]p2p.ID
|
||||||
|
|
||||||
peerTimeout uint
|
peerTimeout uint
|
||||||
peerMinSpeed uint
|
peerMinSpeed uint
|
||||||
}
|
}
|
||||||
@ -109,8 +130,9 @@ func newSchedule(initHeight int64) *schedule {
|
|||||||
initHeight: initHeight,
|
initHeight: initHeight,
|
||||||
blockStates: make(map[int64]blockState),
|
blockStates: make(map[int64]blockState),
|
||||||
peers: make(map[p2p.ID]*scPeer),
|
peers: make(map[p2p.ID]*scPeer),
|
||||||
pending: make(map[int64]p2p.ID),
|
pendingBlocks: make(map[int64]p2p.ID),
|
||||||
pendingTime: make(map[int64]time.Time),
|
pendingTime: make(map[int64]time.Time),
|
||||||
|
receivedBlocks: make(map[int64]p2p.ID),
|
||||||
}
|
}
|
||||||
|
|
||||||
sc.setStateAtHeight(initHeight, blockStateNew)
|
sc.setStateAtHeight(initHeight, blockStateNew)
|
||||||
@ -123,13 +145,12 @@ func (sc *schedule) addPeer(peerID p2p.ID) error {
|
|||||||
return errDuplicatePeer
|
return errDuplicatePeer
|
||||||
}
|
}
|
||||||
sc.peers[peerID] = newScPeer(peerID)
|
sc.peers[peerID] = newScPeer(peerID)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error {
|
func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error {
|
||||||
var peer scPeer
|
peer, ok := sc.peers[peerID]
|
||||||
if peer, ok := sc.peers[peerID]; !ok || peer.state == peerStateRemoved {
|
if !ok || peer.state == peerStateRemoved {
|
||||||
return errPeerNotFound
|
return errPeerNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,8 +160,8 @@ func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sc *schedule) removePeer(peerID p2p.ID) error {
|
func (sc *schedule) removePeer(peerID p2p.ID) error {
|
||||||
var peer scPeer
|
peer, ok := sc.peers[peerID]
|
||||||
if peer, ok := sc.peers[peerID]; !ok || peer.state == peerStateRemoved {
|
if !ok || peer.state == peerStateRemoved {
|
||||||
return errPeerNotFound
|
return errPeerNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -148,9 +169,9 @@ func (sc *schedule) removePeer(peerID p2p.ID) error {
|
|||||||
return errPeerRemoved
|
return errPeerRemoved
|
||||||
}
|
}
|
||||||
|
|
||||||
for height, pendingPeerID := range sc.pending {
|
for height, pendingPeerID := range sc.pendingBlocks {
|
||||||
if peerID == pendingPeerID {
|
if peerID == pendingPeerID {
|
||||||
delete(sc.pending, height)
|
delete(sc.pendingBlocks, height)
|
||||||
sc.blockStates[height] = blockStateNew
|
sc.blockStates[height] = blockStateNew
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -161,13 +182,13 @@ func (sc *schedule) removePeer(peerID p2p.ID) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error {
|
func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error {
|
||||||
var peer scPeer
|
peer, ok := sc.peers[peerID]
|
||||||
if peer, ok := sc.peers[peerID]; !ok || peer.state != peerStateRemoved {
|
if !ok || peer.state == peerStateRemoved {
|
||||||
return errors.New("new peer not found")
|
return errPeerNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if height < peer.height {
|
if height < peer.height {
|
||||||
return errors.New("peer height descending")
|
return errPeerLowersItsHeight
|
||||||
}
|
}
|
||||||
|
|
||||||
peer.height = height
|
peer.height = height
|
||||||
@ -220,39 +241,44 @@ func (sc *schedule) setStateAtHeight(height int64, state blockState) {
|
|||||||
|
|
||||||
// TODO keep track of when i received this block
|
// TODO keep track of when i received this block
|
||||||
func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error {
|
func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error {
|
||||||
var peer scPeer
|
peer, ok := sc.peers[peerID]
|
||||||
if peer, ok := sc.peers[peerID]; !ok || peer.state != peerStateReady {
|
if !ok || peer.state != peerStateReady {
|
||||||
return errPeerNotFound
|
return errPeerNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
if sc.getStateAtHeight(height) != blockStatePending {
|
// it looks like height and size are being interchanged s
|
||||||
|
if state := sc.getStateAtHeight(height); state != blockStatePending {
|
||||||
// received a block not in pending
|
// received a block not in pending
|
||||||
|
// XXX: can we have more specialized errors here?
|
||||||
return errBadSchedule
|
return errBadSchedule
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if the block is pending from that peer
|
// check if the block is pending from that peer
|
||||||
if sc.pending[height] != peerID {
|
if sc.pendingBlocks[height] != peerID {
|
||||||
return errBadSchedule
|
return errBadSchedule
|
||||||
}
|
}
|
||||||
|
|
||||||
var pendingTime time.Time
|
pendingTime, ok := sc.pendingTime[height]
|
||||||
if pendingTime, ok := sc.pendingTime[height]; !ok || pendingTime.Sub(now) < 0 {
|
if !ok || now.Sub(pendingTime) < 0 {
|
||||||
|
// xxx: better errors
|
||||||
return errBadSchedule
|
return errBadSchedule
|
||||||
}
|
}
|
||||||
|
|
||||||
peer.lastRate = size / int64(now.Sub(pendingTime).Seconds())
|
peer.lastRate = size / int64(now.Sub(pendingTime).Seconds())
|
||||||
|
|
||||||
sc.setStateAtHeight(height, blockStateReceived)
|
sc.setStateAtHeight(height, blockStateReceived)
|
||||||
delete(sc.pending, height)
|
delete(sc.pendingBlocks, height)
|
||||||
delete(sc.pendingTime, height)
|
delete(sc.pendingTime, height)
|
||||||
|
|
||||||
|
sc.receivedBlocks[height] = peerID
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo keep track of when i requested this block
|
// todo keep track of when i requested this block
|
||||||
func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) error {
|
func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) error {
|
||||||
var peer scPeer
|
peer, ok := sc.peers[peerID]
|
||||||
if peer, ok := sc.peers[peerID]; !ok || peer.state != peerStateReady {
|
if !ok || peer.state != peerStateReady {
|
||||||
return errPeerNotFound
|
return errPeerNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -262,7 +288,7 @@ func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) err
|
|||||||
}
|
}
|
||||||
|
|
||||||
sc.setStateAtHeight(height, blockStatePending)
|
sc.setStateAtHeight(height, blockStatePending)
|
||||||
sc.pending[height] = peerID
|
sc.pendingBlocks[height] = peerID
|
||||||
// XXX: to make htis more accurate we can introduce a message from
|
// XXX: to make htis more accurate we can introduce a message from
|
||||||
// the IO rutine which indicates the time the request was put on the wire
|
// the IO rutine which indicates the time the request was put on the wire
|
||||||
sc.pendingTime[height] = time
|
sc.pendingTime[height] = time
|
||||||
@ -303,7 +329,7 @@ func (sc *schedule) maxHeight() int64 {
|
|||||||
|
|
||||||
// lowest block | state == blockStateNew
|
// lowest block | state == blockStateNew
|
||||||
func (sc *schedule) minHeight() int64 {
|
func (sc *schedule) minHeight() int64 {
|
||||||
min := sc.initHeight
|
var min int64 = math.MaxInt64
|
||||||
for height, state := range sc.blockStates {
|
for height, state := range sc.blockStates {
|
||||||
if state == blockStateNew && height < min {
|
if state == blockStateNew && height < min {
|
||||||
min = height
|
min = height
|
||||||
@ -313,10 +339,9 @@ func (sc *schedule) minHeight() int64 {
|
|||||||
return min
|
return min
|
||||||
}
|
}
|
||||||
|
|
||||||
// XXX: THis is not yet used
|
|
||||||
func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 {
|
func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 {
|
||||||
heights := []int64{}
|
heights := []int64{}
|
||||||
for height, pendingPeerID := range sc.pending {
|
for height, pendingPeerID := range sc.pendingBlocks {
|
||||||
if pendingPeerID == peerID {
|
if pendingPeerID == peerID {
|
||||||
heights = append(heights, height)
|
heights = append(heights, height)
|
||||||
}
|
}
|
||||||
@ -326,13 +351,20 @@ func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 {
|
|||||||
|
|
||||||
// XXX: What about pedingTime here?
|
// XXX: What about pedingTime here?
|
||||||
// XXX: Split up read and write paths here
|
// XXX: Split up read and write paths here
|
||||||
|
// set any blocks in blockStatePending or blockStateReceived by peerID to blockStateNew
|
||||||
func (sc *schedule) resetBlocks(peerID p2p.ID) error {
|
func (sc *schedule) resetBlocks(peerID p2p.ID) error {
|
||||||
if _, ok := sc.peers[peerID]; !ok {
|
if _, ok := sc.peers[peerID]; !ok {
|
||||||
return errPeerNotFound
|
return errPeerNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
// this should use pendingFrom
|
// this should use pendingFrom
|
||||||
for height, pendingPeerID := range sc.pending {
|
for height, pendingPeerID := range sc.pendingBlocks {
|
||||||
|
if pendingPeerID == peerID {
|
||||||
|
sc.setStateAtHeight(height, blockStateNew)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for height, pendingPeerID := range sc.receivedBlocks {
|
||||||
if pendingPeerID == peerID {
|
if pendingPeerID == peerID {
|
||||||
sc.setStateAtHeight(height, blockStateNew)
|
sc.setStateAtHeight(height, blockStateNew)
|
||||||
}
|
}
|
||||||
|
@ -19,9 +19,9 @@ const (
|
|||||||
func TestScheduleInit(t *testing.T) {
|
func TestScheduleInit(t *testing.T) {
|
||||||
sc := newSchedule(initHeight)
|
sc := newSchedule(initHeight)
|
||||||
|
|
||||||
assert.Equal(t, sc.getStateAtHeight(initHeight), blockStateNew)
|
assert.Equal(t, blockStateNew, sc.getStateAtHeight(initHeight))
|
||||||
assert.Equal(t, sc.getStateAtHeight(initHeight+1), blockStateProcessed)
|
assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(initHeight-1))
|
||||||
assert.Equal(t, sc.getStateAtHeight(initHeight-1), blockStateUnknown)
|
assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(initHeight+1))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAddPeer(t *testing.T) {
|
func TestAddPeer(t *testing.T) {
|
||||||
@ -44,8 +44,9 @@ func TestTouchPeer(t *testing.T) {
|
|||||||
assert.Nil(t, sc.touchPeer(peerID, now),
|
assert.Nil(t, sc.touchPeer(peerID, now),
|
||||||
"Touching a peer should return no error")
|
"Touching a peer should return no error")
|
||||||
|
|
||||||
|
// the peer should
|
||||||
threshold := 10 * time.Second
|
threshold := 10 * time.Second
|
||||||
assert.Equal(t, len(sc.peersSince(threshold, now.Add(9*time.Second))), 0,
|
assert.Equal(t, 0, len(sc.peersSince(threshold, now.Add(9*time.Second))),
|
||||||
"Expected no peers to have been touched over 9 seconds")
|
"Expected no peers to have been touched over 9 seconds")
|
||||||
assert.Equal(t, peerID, sc.peersSince(threshold, now.Add(11*time.Second))[0].peerID,
|
assert.Equal(t, peerID, sc.peersSince(threshold, now.Add(11*time.Second))[0].peerID,
|
||||||
"Expected one peer to have been touched over 10 seconds ago")
|
"Expected one peer to have been touched over 10 seconds ago")
|
||||||
@ -54,6 +55,8 @@ func TestTouchPeer(t *testing.T) {
|
|||||||
func TestPeerHeight(t *testing.T) {
|
func TestPeerHeight(t *testing.T) {
|
||||||
sc := newSchedule(initHeight)
|
sc := newSchedule(initHeight)
|
||||||
|
|
||||||
|
assert.Nil(t, sc.addPeer(peerID),
|
||||||
|
"Adding a peer should return no error")
|
||||||
assert.Nil(t, sc.setPeerHeight(peerID, peerHeight))
|
assert.Nil(t, sc.setPeerHeight(peerID, peerHeight))
|
||||||
for i := initHeight; i <= peerHeight; i++ {
|
for i := initHeight; i <= peerHeight; i++ {
|
||||||
assert.Equal(t, sc.getStateAtHeight(i), blockStateNew,
|
assert.Equal(t, sc.getStateAtHeight(i), blockStateNew,
|
||||||
@ -82,17 +85,18 @@ func TestHeightFSM(t *testing.T) {
|
|||||||
|
|
||||||
assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(peerHeight+10),
|
assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(peerHeight+10),
|
||||||
"Expected the maximum height seen + 10 to be in blockStateUnknown")
|
"Expected the maximum height seen + 10 to be in blockStateUnknown")
|
||||||
|
|
||||||
assert.Equal(t, errBadSchedule, sc.markPending(peerID, peerHeight+10, now.Add(1*time.Second)),
|
assert.Equal(t, errBadSchedule, sc.markPending(peerID, peerHeight+10, now.Add(1*time.Second)),
|
||||||
"Expected markPending on block in blockStateUnknown height to fail")
|
"Expected markPending on block in blockStateUnknown height to fail")
|
||||||
|
|
||||||
assert.Nil(t, sc.markPending(peerID, initHeight, now.Add(1*time.Second)),
|
assert.Nil(t, sc.markPending(peerID, initHeight, now.Add(1*time.Second)),
|
||||||
"Expected markPending on a known height with a known peer to return no error")
|
"Expected markPending on a known height with a known peer to return no error")
|
||||||
assert.Equal(t, blockStatePending, sc.getStateAtHeight(initHeight),
|
assert.Equal(t, blockStatePending, sc.getStateAtHeight(initHeight),
|
||||||
"Expected a the markedBlock to be in blockStatePending")
|
"Expected a the markedBlock to be in blockStatePending")
|
||||||
|
|
||||||
assert.Nil(t, sc.markReceived(peerID, initHeight, blockSize, now.Add(2*time.Second)),
|
assert.Nil(t, sc.markReceived(peerID, initHeight, blockSize, now.Add(2*time.Second)),
|
||||||
"Expected a marking a pending block received to return no error")
|
"Expected marking markReceived on a pending block to return no error")
|
||||||
|
|
||||||
|
// here we are trying to reset blocks that were received
|
||||||
assert.Nil(t, sc.resetBlocks(peerID),
|
assert.Nil(t, sc.resetBlocks(peerID),
|
||||||
"Expected resetBlocks to return no error")
|
"Expected resetBlocks to return no error")
|
||||||
assert.Equal(t, blockStateNew, sc.getStateAtHeight(initHeight),
|
assert.Equal(t, blockStateNew, sc.getStateAtHeight(initHeight),
|
||||||
@ -103,7 +107,7 @@ func TestHeightFSM(t *testing.T) {
|
|||||||
assert.Equal(t, blockStatePending, sc.getStateAtHeight(initHeight),
|
assert.Equal(t, blockStatePending, sc.getStateAtHeight(initHeight),
|
||||||
"Expected block to be in blockStatePending")
|
"Expected block to be in blockStatePending")
|
||||||
|
|
||||||
assert.Nil(t, sc.markReceived(peerID, blockSize, initHeight, now.Add(2*time.Second)),
|
assert.Nil(t, sc.markReceived(peerID, initHeight, blockSize, now.Add(2*time.Second)),
|
||||||
"Expected marking a pending block as received to return no error")
|
"Expected marking a pending block as received to return no error")
|
||||||
assert.Equal(t, blockStateReceived, sc.getStateAtHeight(initHeight))
|
assert.Equal(t, blockStateReceived, sc.getStateAtHeight(initHeight))
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user