fixed pool, using locks now.

This commit is contained in:
Jae Kwon
2015-03-24 11:02:30 -07:00
parent a4606f1c5e
commit 9703d34b65
2 changed files with 347 additions and 315 deletions

View File

@ -1,7 +1,7 @@
package blockchain package blockchain
import ( import (
"math/rand" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -11,308 +11,308 @@ import (
const ( const (
maxOutstandingRequestsPerPeer = 10 maxOutstandingRequestsPerPeer = 10
eventsChannelCapacity = 100 inputsChannelCapacity = 100
maxTries = 3 maxTries = 3
requestIntervalMS = 500 requestIntervalMS = 500
requestBatchSize = 50 requestBatchSize = 50
maxPendingRequests = 50 maxPendingRequests = 50
maxTotalRequests = 100 maxTotalRequests = 100
maxPeersPerRequest = 1 maxRequestsPerPeer = 20
) )
var ( var (
requestTimeoutSeconds = time.Duration(10) requestTimeoutSeconds = time.Duration(1)
) )
type BlockRequest struct {
Height uint
PeerId string
}
type BlockPool struct { type BlockPool struct {
peers map[string]*bpPeer // block requests
blockInfos map[uint]*bpBlockInfo requestsMtx sync.Mutex
height uint // the lowest key in blockInfos. requests map[uint]*bpRequest
started int32 // atomic height uint // the lowest key in requests.
stopped int32 // atomic
numPending int32 numPending int32
numTotal int32 numTotal int32
eventsCh chan interface{} // internal events.
requestsCh chan<- BlockRequest // output of new requests to make. // peers
timeoutsCh chan<- string // output of peers that timed out. peersMtx sync.Mutex
blocksCh chan<- *types.Block // output of ordered blocks. peers map[string]*bpPeer
repeater *RepeatTimer // for requesting more bocks.
quit chan struct{} requestsCh chan<- BlockRequest
timeoutsCh chan<- string
repeater *RepeatTimer
running int32 // atomic
} }
func NewBlockPool(start uint, timeoutsCh chan<- string, requestsCh chan<- BlockRequest, blocksCh chan<- *types.Block) *BlockPool { func NewBlockPool(start uint, requestsCh chan<- BlockRequest, timeoutsCh chan<- string) *BlockPool {
return &BlockPool{ return &BlockPool{
peers: make(map[string]*bpPeer), peers: make(map[string]*bpPeer),
blockInfos: make(map[uint]*bpBlockInfo),
requests: make(map[uint]*bpRequest),
height: start, height: start,
started: 0,
stopped: 0,
numPending: 0, numPending: 0,
numTotal: 0, numTotal: 0,
quit: make(chan struct{}),
eventsCh: make(chan interface{}, eventsChannelCapacity),
requestsCh: requestsCh, requestsCh: requestsCh,
timeoutsCh: timeoutsCh, timeoutsCh: timeoutsCh,
blocksCh: blocksCh,
repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond), repeater: NewRepeatTimer("", requestIntervalMS*time.Millisecond),
running: 0,
} }
} }
func (bp *BlockPool) Start() { func (bp *BlockPool) Start() {
if atomic.CompareAndSwapInt32(&bp.started, 0, 1) { if atomic.CompareAndSwapInt32(&bp.running, 0, 1) {
log.Info("Starting BlockPool") log.Info("Starting BlockPool")
go bp.run() go bp.run()
} }
} }
func (bp *BlockPool) Stop() { func (bp *BlockPool) Stop() {
if atomic.CompareAndSwapInt32(&bp.stopped, 0, 1) { if atomic.CompareAndSwapInt32(&bp.running, 1, 0) {
log.Info("Stopping BlockPool") log.Info("Stopping BlockPool")
close(bp.quit)
close(bp.eventsCh)
close(bp.requestsCh)
close(bp.timeoutsCh)
close(bp.blocksCh)
bp.repeater.Stop() bp.repeater.Stop()
} }
} }
// AddBlock should be called when a block is received. func (bp *BlockPool) IsRunning() bool {
func (bp *BlockPool) AddBlock(block *types.Block, peerId string) { return atomic.LoadInt32(&bp.running) == 1
bp.eventsCh <- bpBlockResponse{block, peerId}
} }
func (bp *BlockPool) SetPeerStatus(peerId string, height uint) { // Run spawns requests as needed.
bp.eventsCh <- bpPeerStatus{peerId, height}
}
// Runs in a goroutine and processes messages.
func (bp *BlockPool) run() { func (bp *BlockPool) run() {
FOR_LOOP: RUN_LOOP:
for { for {
select { if atomic.LoadInt32(&bp.running) == 0 {
case msg := <-bp.eventsCh: break RUN_LOOP
bp.handleEvent(msg)
case <-bp.repeater.Ch:
bp.makeMoreBlockInfos()
bp.requestBlocksFromRandomPeers(10)
case <-bp.quit:
break FOR_LOOP
}
}
}
func (bp *BlockPool) handleEvent(event_ interface{}) {
switch event := event_.(type) {
case bpBlockResponse:
peer := bp.peers[event.peerId]
blockInfo := bp.blockInfos[event.block.Height]
if blockInfo == nil {
// block was unwanted.
if peer != nil {
peer.bad++
} }
height, numPending, numTotal := bp.GetStatus()
log.Debug("BlockPool.run", "height", height, "numPending", numPending,
"numTotal", numTotal)
if numPending >= maxPendingRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
} else if numTotal >= maxTotalRequests {
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
} else { } else {
// block was wanted. // request for more blocks.
if peer != nil { height := bp.nextHeight()
peer.good++ bp.makeRequest(height)
}
delete(peer.requests, event.block.Height)
if blockInfo.block == nil {
// peer is the first to give it to us.
blockInfo.block = event.block
blockInfo.blockBy = peer.id
bp.numPending--
if event.block.Height == bp.height {
go bp.pushBlocksFromStart()
} }
} }
}
func (bp *BlockPool) GetStatus() (uint, int32, int32) {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
return bp.height, bp.numPending, bp.numTotal
}
// We need to see the second block's Validation to validate the first block.
// So we peek two blocks at a time.
func (bp *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
if r := bp.requests[bp.height]; r != nil {
first = r.block
} }
case bpPeerStatus: // updated or new status from peer if r := bp.requests[bp.height+1]; r != nil {
// request blocks if possible. second = r.block
peer := bp.peers[event.peerId]
if peer == nil {
peer = bpNewPeer(event.peerId, event.height)
bp.peers[peer.id] = peer
} }
bp.requestBlocksFromPeer(peer)
case bpRequestTimeout: // unconditional timeout for each peer's request.
peer := bp.peers[event.peerId]
if peer == nil {
// cleanup was already handled.
return return
}
height := event.height
request := peer.requests[height]
if request == nil || request.block != nil {
// the request was fulfilled by some peer or this peer.
return
}
// A request for peer timed out.
peer.bad++
if request.tries < maxTries {
log.Warn("Timeout: Trying again.", "tries", request.tries, "peerId", peer.id)
// try again.
select {
case bp.requestsCh <- BlockRequest{height, peer.id}:
request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries
default:
// The request cannot be made because requestCh is full.
// Just delete the request.
delete(peer.requests, height)
}
} else {
log.Warn("Timeout: Deleting request")
// delete the request.
delete(peer.requests, height)
blockInfo := bp.blockInfos[height]
if blockInfo != nil {
delete(blockInfo.requests, peer.id)
}
select {
case bp.timeoutsCh <- peer.id:
default:
}
}
}
} }
// NOTE: This function is sufficient, but we should find pending blocks // Pop the first block at bp.height
// and sample the peers in one go rather than the current O(n^2) impl. // It must have been validated by 'second'.Validation from PeekTwoBlocks().
func (bp *BlockPool) requestBlocksFromRandomPeers(maxPeers int) { func (bp *BlockPool) PopRequest() {
chosen := bp.pickAvailablePeers(maxPeers) bp.requestsMtx.Lock() // Lock
log.Debug("requestBlocksFromRandomPeers", "chosen", len(chosen)) defer bp.requestsMtx.Unlock()
for _, peer := range chosen {
bp.requestBlocksFromPeer(peer)
}
}
func (bp *BlockPool) requestBlocksFromPeer(peer *bpPeer) { if r := bp.requests[bp.height]; r == nil || r.block == nil {
// If peer is available and can provide something... panic("PopRequest() requires a valid block")
for height := bp.height; peer.available(); height++ {
blockInfo := bp.blockInfos[height]
if blockInfo == nil {
// We're out of range.
return
} }
needsMorePeers := blockInfo.needsMorePeers()
alreadyAskedPeer := blockInfo.requests[peer.id] != nil
if needsMorePeers && !alreadyAskedPeer {
select {
case bp.requestsCh <- BlockRequest{height, peer.id}:
// Create a new request and start the timer.
request := &bpBlockRequest{
height: height,
peer: peer,
}
blockInfo.requests[peer.id] = request
peer.requests[height] = request
request.startAndTimeoutTo(bp.eventsCh) // also bumps request.tries
default:
// The request cannot be made because requestCh is full.
// Just stop.
return
}
}
}
}
func (bp *BlockPool) makeMoreBlockInfos() { delete(bp.requests, bp.height)
// make more requests if necessary.
for i := 0; i < requestBatchSize; i++ {
//log.Debug("Confused?",
// "numPending", bp.numPending, "maxPendingRequests", maxPendingRequests, "numtotal", bp.numTotal, "maxTotalRequests", maxTotalRequests)
if bp.numPending < maxPendingRequests && bp.numTotal < maxTotalRequests {
// Make a request for the next block height
requestHeight := bp.height + uint(bp.numTotal)
log.Debug("New blockInfo", "height", requestHeight)
blockInfo := bpNewBlockInfo(requestHeight)
bp.blockInfos[requestHeight] = blockInfo
bp.numPending++
bp.numTotal++
} else {
break
}
}
}
func (bp *BlockPool) pickAvailablePeers(choose int) []*bpPeer {
available := []*bpPeer{}
for _, peer := range bp.peers {
if peer.available() {
available = append(available, peer)
}
}
perm := rand.Perm(MinInt(choose, len(available)))
chosen := make([]*bpPeer, len(perm))
for i, idx := range perm {
chosen[i] = available[idx]
}
return chosen
}
// blocking
func (bp *BlockPool) pushBlocksFromStart() {
for height := bp.height; ; height++ {
// push block to blocksCh.
blockInfo := bp.blockInfos[height]
if blockInfo == nil || blockInfo.block == nil {
break
}
bp.numTotal--
bp.height++ bp.height++
delete(bp.blockInfos, height) bp.numTotal--
bp.blocksCh <- blockInfo.block }
// Invalidates the block at bp.height.
// Remove the peer and request from others.
func (bp *BlockPool) RedoRequest(height uint) {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
request := bp.requests[height]
if request.block == nil {
panic("Expected block to be non-nil")
} }
bp.removePeer(request.peerId)
request.block = nil
request.peerId = ""
bp.numPending++
go requestRoutine(bp, height)
} }
//----------------------------------------------------------------------------- func (bp *BlockPool) hasBlock(height uint) bool {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
type bpBlockInfo struct { request := bp.requests[height]
height uint return request != nil && request.block != nil
requests map[string]*bpBlockRequest
block *types.Block // first block received
blockBy string // peerId of source
} }
func bpNewBlockInfo(height uint) *bpBlockInfo { func (bp *BlockPool) setPeerForRequest(height uint, peerId string) {
return &bpBlockInfo{ bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
request := bp.requests[height]
if request == nil {
return
}
request.peerId = peerId
}
func (bp *BlockPool) AddBlock(block *types.Block, peerId string) {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
request := bp.requests[block.Height]
if request == nil {
return
}
if request.peerId != peerId {
return
}
if request.block != nil {
return
}
request.block = block
bp.numPending--
}
func (bp *BlockPool) getPeer(peerId string) *bpPeer {
bp.peersMtx.Lock() // Lock
defer bp.peersMtx.Unlock()
peer := bp.peers[peerId]
return peer
}
// Sets the peer's blockchain height.
func (bp *BlockPool) SetPeerHeight(peerId string, height uint) {
bp.peersMtx.Lock() // Lock
defer bp.peersMtx.Unlock()
peer := bp.peers[peerId]
if peer != nil {
peer.height = height
} else {
peer = &bpPeer{
height: height, height: height,
requests: make(map[string]*bpBlockRequest), id: peerId,
numRequests: 0,
}
bp.peers[peerId] = peer
} }
} }
func (blockInfo *bpBlockInfo) needsMorePeers() bool { func (bp *BlockPool) RemovePeer(peerId string) {
return len(blockInfo.requests) < maxPeersPerRequest bp.peersMtx.Lock() // Lock
defer bp.peersMtx.Unlock()
delete(bp.peers, peerId)
} }
//------------------------------------- // Pick an available peer with at least the given minHeight.
// If no peers are available, returns nil.
func (bp *BlockPool) pickIncrAvailablePeer(minHeight uint) *bpPeer {
bp.peersMtx.Lock()
defer bp.peersMtx.Unlock()
type bpBlockRequest struct { for _, peer := range bp.peers {
peer *bpPeer if peer.numRequests >= maxRequestsPerPeer {
height uint continue
block *types.Block
tries int
}
// bump tries++ and set timeout.
// NOTE: the timer is unconditional.
func (request *bpBlockRequest) startAndTimeoutTo(eventsCh chan<- interface{}) {
request.tries++
time.AfterFunc(requestTimeoutSeconds*time.Second, func() {
eventsCh <- bpRequestTimeout{
peerId: request.peer.id,
height: request.height,
} }
}) if peer.height < minHeight {
continue
}
peer.numRequests++
return peer
}
return nil
}
func (bp *BlockPool) decrPeer(peerId string) {
bp.peersMtx.Lock()
defer bp.peersMtx.Unlock()
peer := bp.peers[peerId]
if peer == nil {
return
}
peer.numRequests--
}
func (bp *BlockPool) nextHeight() uint {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
return bp.height + uint(bp.numTotal)
}
func (bp *BlockPool) makeRequest(height uint) {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
request := &bpRequest{
height: height,
peerId: "",
block: nil,
}
bp.requests[height] = request
nextHeight := bp.height + uint(bp.numTotal)
if nextHeight == height {
bp.numTotal++
bp.numPending++
}
go requestRoutine(bp, height)
}
func (bp *BlockPool) sendRequest(height uint, peerId string) {
if atomic.LoadInt32(&bp.running) == 0 {
return
}
bp.requestsCh <- BlockRequest{height, peerId}
}
func (bp *BlockPool) sendTimeout(peerId string) {
if atomic.LoadInt32(&bp.running) == 0 {
return
}
bp.timeoutsCh <- peerId
}
func (bp *BlockPool) debug() string {
bp.requestsMtx.Lock() // Lock
defer bp.requestsMtx.Unlock()
str := ""
for h := bp.height; h < bp.height+uint(bp.numTotal); h++ {
if bp.requests[h] == nil {
str += Fmt("H(%v):X ", h)
} else {
str += Fmt("H(%v):", h)
str += Fmt("B?(%v) ", bp.requests[h].block != nil)
}
}
return str
} }
//------------------------------------- //-------------------------------------
@ -320,38 +320,59 @@ func (request *bpBlockRequest) startAndTimeoutTo(eventsCh chan<- interface{}) {
type bpPeer struct { type bpPeer struct {
id string id string
height uint height uint
requests map[uint]*bpBlockRequest numRequests int32
// Count good/bad events from peer.
good uint
bad uint
} }
func bpNewPeer(peerId string, height uint) *bpPeer { type bpRequest struct {
return &bpPeer{ height uint
id: peerId, peerId string
height: height, block *types.Block
requests: make(map[uint]*bpBlockRequest),
}
}
func (peer *bpPeer) available() bool {
return len(peer.requests) < maxOutstandingRequestsPerPeer
} }
//------------------------------------- //-------------------------------------
// bp.eventsCh messages
type bpBlockResponse struct { // Responsible for making more requests as necessary
block *types.Block // Returns when a block is found (e.g. AddBlock() is called)
peerId string func requestRoutine(bp *BlockPool, height uint) {
for {
var peer *bpPeer = nil
PICK_LOOP:
for {
if !bp.IsRunning() {
return
}
peer = bp.pickIncrAvailablePeer(height)
if peer == nil {
time.Sleep(requestIntervalMS * time.Millisecond)
continue PICK_LOOP
}
break PICK_LOOP
}
bp.setPeerForRequest(height, peer.id)
for try := 0; try < maxTries; try++ {
bp.sendRequest(height, peer.id)
time.Sleep(requestTimeoutSeconds * time.Second)
if bp.hasBlock(height) {
bp.decrPeer(peer.id)
return
}
bpHeight, _, _ := bp.GetStatus()
if height < bpHeight {
bp.decrPeer(peer.id)
return
}
}
bp.RemovePeer(peer.id)
bp.sendTimeout(peer.id)
}
} }
type bpPeerStatus struct { //-------------------------------------
peerId string
height uint // blockchain tip of peer
}
type bpRequestTimeout struct { type BlockRequest struct {
peerId string Height uint
height uint PeerId string
} }

View File

@ -25,26 +25,34 @@ func makePeers(numPeers int, minHeight, maxHeight uint) map[string]testPeer {
} }
func TestBasic(t *testing.T) { func TestBasic(t *testing.T) {
// 100 peers anywhere at height 0 to 1000. peers := makePeers(10, 0, 1000)
peers := makePeers(100, 0, 1000)
start := uint(42) start := uint(42)
maxHeight := uint(300)
timeoutsCh := make(chan string, 100) timeoutsCh := make(chan string, 100)
requestsCh := make(chan BlockRequest, 100) requestsCh := make(chan BlockRequest, 100)
blocksCh := make(chan *types.Block, 100) pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh)
pool.Start() pool.Start()
// Introduce each peer. // Introduce each peer.
go func() { go func() {
for _, peer := range peers { for _, peer := range peers {
pool.SetPeerStatus(peer.id, peer.height) pool.SetPeerHeight(peer.id, peer.height)
} }
}() }()
lastSeenBlock := uint(41) // Start a goroutine to pull blocks
go func() {
for {
if !pool.IsRunning() {
return
}
first, second := pool.PeekTwoBlocks()
if first != nil && second != nil {
pool.PopRequest()
} else {
time.Sleep(1 * time.Second)
}
}
}()
// Pull from channels // Pull from channels
for { for {
@ -53,21 +61,15 @@ func TestBasic(t *testing.T) {
t.Errorf("timeout: %v", peerId) t.Errorf("timeout: %v", peerId)
case request := <-requestsCh: case request := <-requestsCh:
log.Debug("TEST: Pulled new BlockRequest", "request", request) log.Debug("TEST: Pulled new BlockRequest", "request", request)
// After a while, pretend like we got a block from the peer. if request.Height == 300 {
return // Done!
}
// Request desired, pretend like we got the block immediately.
go func() { go func() {
block := &types.Block{Header: &types.Header{Height: request.Height}} block := &types.Block{Header: &types.Header{Height: request.Height}}
pool.AddBlock(block, request.PeerId) pool.AddBlock(block, request.PeerId)
log.Debug("TEST: Added block", "block", request.Height, "peer", request.PeerId) log.Debug("TEST: Added block", "block", request.Height, "peer", request.PeerId)
}() }()
case block := <-blocksCh:
log.Debug("TEST: Pulled new Block", "height", block.Height)
if block.Height != lastSeenBlock+1 {
t.Fatalf("Wrong order of blocks seen. Expected: %v Got: %v", lastSeenBlock+1, block.Height)
}
lastSeenBlock++
if block.Height == maxHeight {
return // Done!
}
} }
} }
@ -75,43 +77,52 @@ func TestBasic(t *testing.T) {
} }
func TestTimeout(t *testing.T) { func TestTimeout(t *testing.T) {
origRequestTimeoutSeconds := requestTimeoutSeconds peers := makePeers(10, 0, 1000)
requestTimeoutSeconds = time.Duration(0)
defer func() { requestTimeoutSeconds = origRequestTimeoutSeconds }()
peers := makePeers(100, 0, 1000)
start := uint(42) start := uint(42)
timeoutsCh := make(chan string, 10) timeoutsCh := make(chan string, 100)
requestsCh := make(chan BlockRequest, 10) requestsCh := make(chan BlockRequest, 100)
blocksCh := make(chan *types.Block, 100) pool := NewBlockPool(start, requestsCh, timeoutsCh)
pool := NewBlockPool(start, timeoutsCh, requestsCh, blocksCh)
pool.Start() pool.Start()
// Introduce each peer. // Introduce each peer.
go func() { go func() {
for _, peer := range peers { for _, peer := range peers {
pool.SetPeerStatus(peer.id, peer.height) pool.SetPeerHeight(peer.id, peer.height)
}
}()
// Start a goroutine to pull blocks
go func() {
for {
if !pool.IsRunning() {
return
}
first, second := pool.PeekTwoBlocks()
if first != nil && second != nil {
pool.PopRequest()
} else {
time.Sleep(1 * time.Second)
}
} }
}() }()
// Pull from channels // Pull from channels
counter := 0
timedOut := map[string]struct{}{}
for { for {
select { select {
case peerId := <-timeoutsCh: case peerId := <-timeoutsCh:
// Timed out. Done! log.Debug("Timeout", "peerId", peerId)
if peers[peerId].id != peerId { if _, ok := timedOut[peerId]; !ok {
t.Errorf("Unexpected peer from timeoutsCh") counter++
if counter == len(peers) {
return // Done!
} }
return }
case _ = <-requestsCh: case request := <-requestsCh:
// Don't do anything, let it time out. log.Debug("TEST: Pulled new BlockRequest", "request", request)
case _ = <-blocksCh:
t.Errorf("Got block when none expected")
return
} }
} }
pool.Stop() pool.Stop()
} }