From e6c5d2ca632920d95a3e9af7b106274d9c9ba03d Mon Sep 17 00:00:00 2001 From: Sean Braithwaite Date: Tue, 16 Jul 2019 18:34:02 +0300 Subject: [PATCH] all tests pass --- blockchain/v2/scheduler.go | 98 ++++++++++++++++++++++----------- blockchain/v2/scheduler_test.go | 18 +++--- 2 files changed, 76 insertions(+), 40 deletions(-) diff --git a/blockchain/v2/scheduler.go b/blockchain/v2/scheduler.go index 26c0c451..55319eb2 100644 --- a/blockchain/v2/scheduler.go +++ b/blockchain/v2/scheduler.go @@ -2,6 +2,7 @@ package v2 import ( "errors" + "fmt" "math" "math/rand" "time" @@ -51,13 +52,31 @@ type schedulerErrorEv struct { type blockState int const ( - blockStateUnknown = iota + blockStateUnknown blockState = iota blockStateNew blockStatePending blockStateReceived blockStateProcessed ) +func (e blockState) String() string { + switch e { + case blockStateUnknown: + return "Unknown" + case blockStateNew: + return "New" + case blockStatePending: + return "Pending" + case blockStateReceived: + return "Received" + case blockStateProcessed: + return "Processed" + default: + // XXX: panic? + return fmt.Sprintf("default %d", e) + } +} + type scBlockRequestMessage struct { peerID p2p.ID height int64 @@ -97,8 +116,10 @@ type schedule struct { peers map[p2p.ID]*scPeer // a map of heights to the peer we are waiting for a response from - pending map[int64]p2p.ID - pendingTime map[int64]time.Time + pendingBlocks map[int64]p2p.ID + pendingTime map[int64]time.Time + + receivedBlocks map[int64]p2p.ID peerTimeout uint peerMinSpeed uint @@ -106,11 +127,12 @@ type schedule struct { func newSchedule(initHeight int64) *schedule { sc := schedule{ - initHeight: initHeight, - blockStates: make(map[int64]blockState), - peers: make(map[p2p.ID]*scPeer), - pending: make(map[int64]p2p.ID), - pendingTime: make(map[int64]time.Time), + initHeight: initHeight, + blockStates: make(map[int64]blockState), + peers: make(map[p2p.ID]*scPeer), + pendingBlocks: make(map[int64]p2p.ID), + pendingTime: make(map[int64]time.Time), + receivedBlocks: make(map[int64]p2p.ID), } sc.setStateAtHeight(initHeight, blockStateNew) @@ -123,13 +145,12 @@ func (sc *schedule) addPeer(peerID p2p.ID) error { return errDuplicatePeer } sc.peers[peerID] = newScPeer(peerID) - return nil } func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error { - var peer scPeer - if peer, ok := sc.peers[peerID]; !ok || peer.state == peerStateRemoved { + peer, ok := sc.peers[peerID] + if !ok || peer.state == peerStateRemoved { return errPeerNotFound } @@ -139,8 +160,8 @@ func (sc *schedule) touchPeer(peerID p2p.ID, time time.Time) error { } func (sc *schedule) removePeer(peerID p2p.ID) error { - var peer scPeer - if peer, ok := sc.peers[peerID]; !ok || peer.state == peerStateRemoved { + peer, ok := sc.peers[peerID] + if !ok || peer.state == peerStateRemoved { return errPeerNotFound } @@ -148,9 +169,9 @@ func (sc *schedule) removePeer(peerID p2p.ID) error { return errPeerRemoved } - for height, pendingPeerID := range sc.pending { + for height, pendingPeerID := range sc.pendingBlocks { if peerID == pendingPeerID { - delete(sc.pending, height) + delete(sc.pendingBlocks, height) sc.blockStates[height] = blockStateNew } } @@ -161,13 +182,13 @@ func (sc *schedule) removePeer(peerID p2p.ID) error { } func (sc *schedule) setPeerHeight(peerID p2p.ID, height int64) error { - var peer scPeer - if peer, ok := sc.peers[peerID]; !ok || peer.state != peerStateRemoved { - return errors.New("new peer not found") + peer, ok := sc.peers[peerID] + if !ok || peer.state == peerStateRemoved { + return errPeerNotFound } if height < peer.height { - return errors.New("peer height descending") + return errPeerLowersItsHeight } peer.height = height @@ -220,39 +241,44 @@ func (sc *schedule) setStateAtHeight(height int64, state blockState) { // TODO keep track of when i received this block func (sc *schedule) markReceived(peerID p2p.ID, height int64, size int64, now time.Time) error { - var peer scPeer - if peer, ok := sc.peers[peerID]; !ok || peer.state != peerStateReady { + peer, ok := sc.peers[peerID] + if !ok || peer.state != peerStateReady { return errPeerNotFound } - if sc.getStateAtHeight(height) != blockStatePending { + // it looks like height and size are being interchanged s + if state := sc.getStateAtHeight(height); state != blockStatePending { // received a block not in pending + // XXX: can we have more specialized errors here? return errBadSchedule } // check if the block is pending from that peer - if sc.pending[height] != peerID { + if sc.pendingBlocks[height] != peerID { return errBadSchedule } - var pendingTime time.Time - if pendingTime, ok := sc.pendingTime[height]; !ok || pendingTime.Sub(now) < 0 { + pendingTime, ok := sc.pendingTime[height] + if !ok || now.Sub(pendingTime) < 0 { + // xxx: better errors return errBadSchedule } peer.lastRate = size / int64(now.Sub(pendingTime).Seconds()) sc.setStateAtHeight(height, blockStateReceived) - delete(sc.pending, height) + delete(sc.pendingBlocks, height) delete(sc.pendingTime, height) + sc.receivedBlocks[height] = peerID + return nil } // todo keep track of when i requested this block func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) error { - var peer scPeer - if peer, ok := sc.peers[peerID]; !ok || peer.state != peerStateReady { + peer, ok := sc.peers[peerID] + if !ok || peer.state != peerStateReady { return errPeerNotFound } @@ -262,7 +288,7 @@ func (sc *schedule) markPending(peerID p2p.ID, height int64, time time.Time) err } sc.setStateAtHeight(height, blockStatePending) - sc.pending[height] = peerID + sc.pendingBlocks[height] = peerID // XXX: to make htis more accurate we can introduce a message from // the IO rutine which indicates the time the request was put on the wire sc.pendingTime[height] = time @@ -303,7 +329,7 @@ func (sc *schedule) maxHeight() int64 { // lowest block | state == blockStateNew func (sc *schedule) minHeight() int64 { - min := sc.initHeight + var min int64 = math.MaxInt64 for height, state := range sc.blockStates { if state == blockStateNew && height < min { min = height @@ -313,10 +339,9 @@ func (sc *schedule) minHeight() int64 { return min } -// XXX: THis is not yet used func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 { heights := []int64{} - for height, pendingPeerID := range sc.pending { + for height, pendingPeerID := range sc.pendingBlocks { if pendingPeerID == peerID { heights = append(heights, height) } @@ -326,13 +351,20 @@ func (sc *schedule) pendingFrom(peerID p2p.ID) []int64 { // XXX: What about pedingTime here? // XXX: Split up read and write paths here +// set any blocks in blockStatePending or blockStateReceived by peerID to blockStateNew func (sc *schedule) resetBlocks(peerID p2p.ID) error { if _, ok := sc.peers[peerID]; !ok { return errPeerNotFound } // this should use pendingFrom - for height, pendingPeerID := range sc.pending { + for height, pendingPeerID := range sc.pendingBlocks { + if pendingPeerID == peerID { + sc.setStateAtHeight(height, blockStateNew) + } + } + + for height, pendingPeerID := range sc.receivedBlocks { if pendingPeerID == peerID { sc.setStateAtHeight(height, blockStateNew) } diff --git a/blockchain/v2/scheduler_test.go b/blockchain/v2/scheduler_test.go index 0f67c07e..d625bb4c 100644 --- a/blockchain/v2/scheduler_test.go +++ b/blockchain/v2/scheduler_test.go @@ -19,9 +19,9 @@ const ( func TestScheduleInit(t *testing.T) { sc := newSchedule(initHeight) - assert.Equal(t, sc.getStateAtHeight(initHeight), blockStateNew) - assert.Equal(t, sc.getStateAtHeight(initHeight+1), blockStateProcessed) - assert.Equal(t, sc.getStateAtHeight(initHeight-1), blockStateUnknown) + assert.Equal(t, blockStateNew, sc.getStateAtHeight(initHeight)) + assert.Equal(t, blockStateProcessed, sc.getStateAtHeight(initHeight-1)) + assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(initHeight+1)) } func TestAddPeer(t *testing.T) { @@ -44,8 +44,9 @@ func TestTouchPeer(t *testing.T) { assert.Nil(t, sc.touchPeer(peerID, now), "Touching a peer should return no error") + // the peer should threshold := 10 * time.Second - assert.Equal(t, len(sc.peersSince(threshold, now.Add(9*time.Second))), 0, + assert.Equal(t, 0, len(sc.peersSince(threshold, now.Add(9*time.Second))), "Expected no peers to have been touched over 9 seconds") assert.Equal(t, peerID, sc.peersSince(threshold, now.Add(11*time.Second))[0].peerID, "Expected one peer to have been touched over 10 seconds ago") @@ -54,6 +55,8 @@ func TestTouchPeer(t *testing.T) { func TestPeerHeight(t *testing.T) { sc := newSchedule(initHeight) + assert.Nil(t, sc.addPeer(peerID), + "Adding a peer should return no error") assert.Nil(t, sc.setPeerHeight(peerID, peerHeight)) for i := initHeight; i <= peerHeight; i++ { assert.Equal(t, sc.getStateAtHeight(i), blockStateNew, @@ -82,17 +85,18 @@ func TestHeightFSM(t *testing.T) { assert.Equal(t, blockStateUnknown, sc.getStateAtHeight(peerHeight+10), "Expected the maximum height seen + 10 to be in blockStateUnknown") + assert.Equal(t, errBadSchedule, sc.markPending(peerID, peerHeight+10, now.Add(1*time.Second)), "Expected markPending on block in blockStateUnknown height to fail") - assert.Nil(t, sc.markPending(peerID, initHeight, now.Add(1*time.Second)), "Expected markPending on a known height with a known peer to return no error") assert.Equal(t, blockStatePending, sc.getStateAtHeight(initHeight), "Expected a the markedBlock to be in blockStatePending") assert.Nil(t, sc.markReceived(peerID, initHeight, blockSize, now.Add(2*time.Second)), - "Expected a marking a pending block received to return no error") + "Expected marking markReceived on a pending block to return no error") + // here we are trying to reset blocks that were received assert.Nil(t, sc.resetBlocks(peerID), "Expected resetBlocks to return no error") assert.Equal(t, blockStateNew, sc.getStateAtHeight(initHeight), @@ -103,7 +107,7 @@ func TestHeightFSM(t *testing.T) { assert.Equal(t, blockStatePending, sc.getStateAtHeight(initHeight), "Expected block to be in blockStatePending") - assert.Nil(t, sc.markReceived(peerID, blockSize, initHeight, now.Add(2*time.Second)), + assert.Nil(t, sc.markReceived(peerID, initHeight, blockSize, now.Add(2*time.Second)), "Expected marking a pending block as received to return no error") assert.Equal(t, blockStateReceived, sc.getStateAtHeight(initHeight))