Reactors can be stopped or started at any time.

This commit is contained in:
Jae Kwon
2015-03-25 00:15:18 -07:00
parent 612f8bab9d
commit 08a83aa9fb
13 changed files with 340 additions and 96 deletions

View File

@@ -149,7 +149,7 @@ func (bp *BlockPool) RedoRequest(height uint) {
if request.block == nil {
panic("Expected block to be non-nil")
}
bp.removePeer(request.peerId)
bp.RemovePeer(request.peerId) // Lock on peersMtx.
request.block = nil
request.peerId = ""
bp.numPending++

226
blockchain/reactor.go Normal file
View File

@@ -0,0 +1,226 @@
package blockchain
import (
"bytes"
"errors"
"fmt"
"sync/atomic"
"time"
"github.com/tendermint/tendermint/binary"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/types"
)
const (
BlockchainChannel = byte(0x40)
defaultChannelCapacity = 100
defaultSleepIntervalMS = 500
)
// BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct {
sw *p2p.Switch
store *BlockStore
pool *BlockPool
requestsCh chan BlockRequest
timeoutsCh chan string
lastBlock *types.Block
quit chan struct{}
started uint32
stopped uint32
}
func NewBlockchainReactor(store *BlockStore) *BlockchainReactor {
requestsCh := make(chan BlockRequest, defaultChannelCapacity)
timeoutsCh := make(chan string, defaultChannelCapacity)
pool := NewBlockPool(
store.Height()+1,
requestsCh,
timeoutsCh,
)
bcR := &BlockchainReactor{
store: store,
pool: pool,
requestsCh: requestsCh,
timeoutsCh: timeoutsCh,
quit: make(chan struct{}),
started: 0,
stopped: 0,
}
return bcR
}
// Implements Reactor
func (bcR *BlockchainReactor) Start(sw *p2p.Switch) {
if atomic.CompareAndSwapUint32(&bcR.started, 0, 1) {
log.Info("Starting BlockchainReactor")
bcR.sw = sw
bcR.pool.Start()
go bcR.poolRoutine()
}
}
// Implements Reactor
func (bcR *BlockchainReactor) Stop() {
if atomic.CompareAndSwapUint32(&bcR.stopped, 0, 1) {
log.Info("Stopping BlockchainReactor")
close(bcR.quit)
bcR.pool.Stop()
}
}
// Implements Reactor
func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
return []*p2p.ChannelDescriptor{
&p2p.ChannelDescriptor{
Id: BlockchainChannel,
Priority: 5,
SendQueueCapacity: 20, // Queue 20 blocks to send to a peer.
},
}
}
// Implements Reactor
func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
// Send peer our state.
peer.Send(BlockchainChannel, PeerStatusMessage{bcR.store.Height()})
}
// Implements Reactor
func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// Remove peer from the pool.
bcR.pool.RemovePeer(peer.Key)
}
// Implements Reactor
func (bcR *BlockchainReactor) Receive(chId byte, src *p2p.Peer, msgBytes []byte) {
_, msg_, err := DecodeMessage(msgBytes)
if err != nil {
log.Warn("Error decoding message", "error", err)
return
}
log.Info("BlockchainReactor received message", "msg", msg_)
switch msg := msg_.(type) {
case BlockRequestMessage:
log.Debug("Got BlockRequest", "msg", msg)
// Got a request for a block. Respond with block if we have it.
block := bcR.store.LoadBlock(msg.Height)
if block != nil {
msg := BlockResponseMessage{Block: block}
queued := src.TrySend(BlockchainChannel, msg)
if !queued {
// queue is full, just ignore.
}
} else {
// TODO peer is asking for things we don't have.
}
case BlockResponseMessage:
log.Debug("Got BlockResponse", "msg", msg)
// Got a block.
bcR.pool.AddBlock(msg.Block, src.Key)
case PeerStatusMessage:
log.Debug("Got PeerStatus", "msg", msg)
// Got a peer status.
bcR.pool.SetPeerHeight(src.Key, msg.Height)
default:
// Ignore unknown message
}
}
func (bcR *BlockchainReactor) poolRoutine() {
FOR_LOOP:
for {
select {
case request := <-bcR.requestsCh: // chan BlockRequest
peer := bcR.sw.Peers().Get(request.PeerId)
if peer == nil {
// We can't fulfill the request.
continue FOR_LOOP
}
msg := BlockRequestMessage{request.Height}
queued := peer.TrySend(BlockchainChannel, msg)
if !queued {
// We couldn't queue the request.
time.Sleep(defaultSleepIntervalMS * time.Millisecond)
continue FOR_LOOP
}
case peerId := <-bcR.timeoutsCh: // chan string
// Peer timed out.
peer := bcR.sw.Peers().Get(peerId)
bcR.sw.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
case <-bcR.quit:
break FOR_LOOP
}
}
}
func (bcR *BlockchainReactor) BroadcastStatus() error {
bcR.sw.Broadcast(BlockchainChannel, PeerStatusMessage{bcR.store.Height()})
return nil
}
//-----------------------------------------------------------------------------
// Messages
const (
msgTypeUnknown = byte(0x00)
msgTypeBlockRequest = byte(0x10)
msgTypeBlockResponse = byte(0x11)
msgTypePeerStatus = byte(0x20)
)
// TODO: check for unnecessary extra bytes at the end.
func DecodeMessage(bz []byte) (msgType byte, msg interface{}, err error) {
n := new(int64)
msgType = bz[0]
r := bytes.NewReader(bz)
switch msgType {
case msgTypeBlockRequest:
msg = binary.ReadBinary(BlockRequestMessage{}, r, n, &err)
case msgTypeBlockResponse:
msg = binary.ReadBinary(BlockResponseMessage{}, r, n, &err)
case msgTypePeerStatus:
msg = binary.ReadBinary(PeerStatusMessage{}, r, n, &err)
default:
msg = nil
}
return
}
//-------------------------------------
type BlockRequestMessage struct {
Height uint
}
func (m BlockRequestMessage) TypeByte() byte { return msgTypeBlockRequest }
func (m BlockRequestMessage) String() string {
return fmt.Sprintf("[BlockRequestMessage %v]", m.Height)
}
//-------------------------------------
type BlockResponseMessage struct {
Block *types.Block
}
func (m BlockResponseMessage) TypeByte() byte { return msgTypeBlockResponse }
func (m BlockResponseMessage) String() string {
return fmt.Sprintf("[BlockResponseMessage %v]", m.Block.Height)
}
//-------------------------------------
type PeerStatusMessage struct {
Height uint
}
func (m PeerStatusMessage) TypeByte() byte { return msgTypePeerStatus }
func (m PeerStatusMessage) String() string {
return fmt.Sprintf("[PeerStatusMessage %v]", m.Height)
}

View File

@@ -57,7 +57,7 @@ func (bs *BlockStore) LoadBlock(height uint) *types.Block {
if r == nil {
panic(Fmt("Block does not exist at height %v", height))
}
meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta)
meta := binary.ReadBinary(&types.BlockMeta{}, r, &n, &err).(*types.BlockMeta)
if err != nil {
panic(Fmt("Error reading block meta: %v", err))
}
@@ -87,14 +87,14 @@ func (bs *BlockStore) LoadBlockPart(height uint, index uint) *types.Part {
return part
}
func (bs *BlockStore) LoadBlockMeta(height uint) *BlockMeta {
func (bs *BlockStore) LoadBlockMeta(height uint) *types.BlockMeta {
var n int64
var err error
r := bs.GetReader(calcBlockMetaKey(height))
if r == nil {
panic(Fmt("BlockMeta does not exist for height %v", height))
}
meta := binary.ReadBinary(&BlockMeta{}, r, &n, &err).(*BlockMeta)
meta := binary.ReadBinary(&types.BlockMeta{}, r, &n, &err).(*types.BlockMeta)
if err != nil {
panic(Fmt("Error reading block meta: %v", err))
}
@@ -150,7 +150,7 @@ func (bs *BlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, s
}
// Save block meta
meta := makeBlockMeta(block, blockParts)
meta := types.NewBlockMeta(block, blockParts)
metaBytes := binary.BinaryBytes(meta)
bs.db.Set(calcBlockMetaKey(height), metaBytes)
@@ -184,22 +184,6 @@ func (bs *BlockStore) saveBlockPart(height uint, index uint, part *types.Part) {
//-----------------------------------------------------------------------------
type BlockMeta struct {
Hash []byte // The block hash
Header *types.Header // The block's Header
Parts types.PartSetHeader // The PartSetHeader, for transfer
}
func makeBlockMeta(block *types.Block, blockParts *types.PartSet) *BlockMeta {
return &BlockMeta{
Hash: block.Hash(),
Header: block.Header,
Parts: blockParts.Header(),
}
}
//-----------------------------------------------------------------------------
func calcBlockMetaKey(height uint) []byte {
return []byte(fmt.Sprintf("H:%v", height))
}