Merge remote-tracking branch 'orijtech/p2p-full-test-PeerSet' into develop

This commit is contained in:
Ethan Buchman 2017-09-06 16:40:07 -04:00
commit cb80ab2965
2 changed files with 107 additions and 21 deletions

View File

@ -27,6 +27,7 @@ type peerSetItem struct {
index int index int
} }
// NewPeerSet creates a new peerSet with a list of initial capacity of 256 items.
func NewPeerSet() *PeerSet { func NewPeerSet() *PeerSet {
return &PeerSet{ return &PeerSet{
lookup: make(map[string]*peerSetItem), lookup: make(map[string]*peerSetItem),
@ -34,7 +35,8 @@ func NewPeerSet() *PeerSet {
} }
} }
// Returns false if peer with key (PubKeyEd25519) is already set // Add returns false if the peer's Key (PubKeyEd25519) is already memoized.
// If the peer was already added, it returns ErrSwitchDuplicatePeer.
func (ps *PeerSet) Add(peer *Peer) error { func (ps *PeerSet) Add(peer *Peer) error {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -50,13 +52,16 @@ func (ps *PeerSet) Add(peer *Peer) error {
return nil return nil
} }
// Has returns true iff the peerset contains
// the peer referred to by this peerKey.
func (ps *PeerSet) Has(peerKey string) bool { func (ps *PeerSet) Has(peerKey string) bool {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock()
_, ok := ps.lookup[peerKey] _, ok := ps.lookup[peerKey]
ps.mtx.Unlock()
return ok return ok
} }
// Get looks up a peer by the provided peerKey.
func (ps *PeerSet) Get(peerKey string) *Peer { func (ps *PeerSet) Get(peerKey string) *Peer {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -68,6 +73,7 @@ func (ps *PeerSet) Get(peerKey string) *Peer {
} }
} }
// Remove discards peer by its Key, if the peer was previously memoized.
func (ps *PeerSet) Remove(peer *Peer) { func (ps *PeerSet) Remove(peer *Peer) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -77,8 +83,8 @@ func (ps *PeerSet) Remove(peer *Peer) {
} }
index := item.index index := item.index
// Copy the list but without the last element. // Create a new copy of the list but with one less item.
// (we must copy because we're mutating the list) // (we must copy because we'll be mutating the list).
newList := make([]*Peer, len(ps.list)-1) newList := make([]*Peer, len(ps.list)-1)
copy(newList, ps.list) copy(newList, ps.list)
// If it's the last peer, that's an easy special case. // If it's the last peer, that's an easy special case.
@ -88,7 +94,7 @@ func (ps *PeerSet) Remove(peer *Peer) {
return return
} }
// Move the last item from ps.list to "index" in list. // Replace the popped item with the last item in the old list.
lastPeer := ps.list[len(ps.list)-1] lastPeer := ps.list[len(ps.list)-1]
lastPeerKey := lastPeer.Key lastPeerKey := lastPeer.Key
lastPeerItem := ps.lookup[lastPeerKey] lastPeerItem := ps.lookup[lastPeerKey]
@ -96,16 +102,16 @@ func (ps *PeerSet) Remove(peer *Peer) {
lastPeerItem.index = index lastPeerItem.index = index
ps.list = newList ps.list = newList
delete(ps.lookup, peer.Key) delete(ps.lookup, peer.Key)
} }
// Size returns the number of unique items in the peerSet.
func (ps *PeerSet) Size() int { func (ps *PeerSet) Size() int {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
return len(ps.list) return len(ps.list)
} }
// threadsafe list of peers. // List returns the threadsafe list of peers.
func (ps *PeerSet) List() []*Peer { func (ps *PeerSet) List() []*Peer {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()

View File

@ -2,8 +2,11 @@ package p2p
import ( import (
"math/rand" "math/rand"
"sync"
"testing" "testing"
"github.com/stretchr/testify/assert"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
) )
@ -18,28 +21,47 @@ func randPeer() *Peer {
} }
} }
func TestAddRemoveOne(t *testing.T) { func TestPeerSetAddRemoveOne(t *testing.T) {
t.Parallel()
peerSet := NewPeerSet() peerSet := NewPeerSet()
peer := randPeer() var peerList []*Peer
err := peerSet.Add(peer) for i := 0; i < 5; i++ {
if err != nil { p := randPeer()
t.Errorf("Failed to add new peer") peerSet.Add(p)
} peerList = append(peerList, p)
if peerSet.Size() != 1 {
t.Errorf("Failed to add new peer and increment size")
} }
peerSet.Remove(peer) n := len(peerList)
if peerSet.Has(peer.Key) { // 1. Test removing from the front
t.Errorf("Failed to remove peer") for i, peerAtFront := range peerList {
peerSet.Remove(peerAtFront)
wantSize := n - i - 1
for j := 0; j < 2; j++ {
assert.Equal(t, false, peerSet.Has(peerAtFront.Key), "#%d Run #%d: failed to remove peer", i, j)
assert.Equal(t, wantSize, peerSet.Size(), "#%d Run #%d: failed to remove peer and decrement size", i, j)
// Test the route of removing the now non-existent element
peerSet.Remove(peerAtFront)
}
} }
if peerSet.Size() != 0 {
t.Errorf("Failed to remove peer and decrement size") // 2. Next we are testing removing the peer at the end
// a) Replenish the peerSet
for _, peer := range peerList {
peerSet.Add(peer)
}
// b) In reverse, remove each element
for i := n - 1; i >= 0; i-- {
peerAtEnd := peerList[i]
peerSet.Remove(peerAtEnd)
assert.Equal(t, false, peerSet.Has(peerAtEnd.Key), "#%d: failed to remove item at end", i)
assert.Equal(t, i, peerSet.Size(), "#%d: differing sizes after peerSet.Remove(atEndPeer)", i)
} }
} }
func TestAddRemoveMany(t *testing.T) { func TestPeerSetAddRemoveMany(t *testing.T) {
t.Parallel()
peerSet := NewPeerSet() peerSet := NewPeerSet()
peers := []*Peer{} peers := []*Peer{}
@ -65,3 +87,61 @@ func TestAddRemoveMany(t *testing.T) {
} }
} }
} }
func TestPeerSetAddDuplicate(t *testing.T) {
t.Parallel()
peerSet := NewPeerSet()
peer := randPeer()
n := 20
errsChan := make(chan error)
// Add the same asynchronously to test the
// concurrent guarantees of our APIs, and
// our expectation in the end is that only
// one addition succeeded, but the rest are
// instances of ErrSwitchDuplicatePeer.
for i := 0; i < n; i++ {
go func() {
errsChan <- peerSet.Add(peer)
}()
}
// Now collect and tally the results
errsTally := make(map[error]int)
for i := 0; i < n; i++ {
err := <-errsChan
errsTally[err] += 1
}
// Our next procedure is to ensure that only one addition
// succeeded and that the rest are each ErrSwitchDuplicatePeer.
wantErrCount, gotErrCount := n-1, errsTally[ErrSwitchDuplicatePeer]
assert.Equal(t, wantErrCount, gotErrCount, "invalid ErrSwitchDuplicatePeer count")
wantNilErrCount, gotNilErrCount := 1, errsTally[nil]
assert.Equal(t, wantNilErrCount, gotNilErrCount, "invalid nil errCount")
}
func TestPeerSetGet(t *testing.T) {
t.Parallel()
peerSet := NewPeerSet()
peer := randPeer()
assert.Nil(t, peerSet.Get(peer.Key), "expecting a nil lookup, before .Add")
if err := peerSet.Add(peer); err != nil {
t.Fatalf("Failed to add new peer: %v", err)
}
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
// Add them asynchronously to test the
// concurrent guarantees of our APIs.
wg.Add(1)
go func(i int) {
defer wg.Done()
got, want := peerSet.Get(peer.Key), peer
assert.Equal(t, got, want, "#%d: got=%v want=%v", i, got, want)
}(i)
}
wg.Wait()
}