move everything to blockManagerState; sim

This commit is contained in:
Jae Kwon
2014-08-03 15:50:28 -07:00
parent 666122861c
commit 8db5b7b614
7 changed files with 886 additions and 181 deletions

View File

@ -25,14 +25,21 @@ const (
msgTypeState = Byte(0x01) msgTypeState = Byte(0x01)
msgTypeRequest = Byte(0x02) msgTypeRequest = Byte(0x02)
msgTypeData = Byte(0x03) msgTypeData = Byte(0x03)
maxRequestsPerPeer = 2 // Maximum number of outstanding requests from peer.
maxRequestsPerData = 2 // Maximum number of outstanding requests of some data.
maxRequestAheadBlock = 5 // Maximum number of blocks to request ahead of current verified. Must be >= 1
defaultRequestTimeoutS =
timeoutRepeatTimerMS = 1000 // Handle timed out requests periodically
) )
/* /*
TODO: keep tabs on current active requests onPeerState.
TODO: keep a heap of dataRequests * their corresponding timeouts. TODO: keep a heap of dataRequests * their corresponding timeouts.
timeout dataRequests and update the peerState, timeout dataRequests and update the peerState,
TODO: when a data item has bene received successfully, update the peerState. TODO: need to keep track of progress, blocks are too large. or we need to chop into chunks.
ensure goroutine safety. TODO: need to validate blocks. :/
TODO: actually save the block.
*/ */
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -42,10 +49,32 @@ const (
// TODO: allow for more types, such as specific transactions // TODO: allow for more types, such as specific transactions
) )
func computeDataKey(dataType byte, height uint64) string { type dataKey struct {
dataType byte
height uint64
}
func newDataKey(dataType byte, height uint64) dataKey {
return dataKey{dataType, height}
}
func readDataKey(r io.Reader) dataKey {
return dataKey{
dataType: ReadByte(r),
height: ReadUInt64(r),
}
}
func (dk dataKey) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteTo(dk.dataType, w, n, err)
n, err = WriteTo(dk.height, w, n, err)
return
}
func (dk dataKey) String() string {
switch dataType { switch dataType {
case dataTypeBlock: case dataTypeBlock:
return fmt.Sprintf("B%v", height) return dataKeyfmt.Sprintf("B%v", height)
default: default:
Panicf("Unknown datatype %X", dataType) Panicf("Unknown datatype %X", dataType)
return "" // should not happen return "" // should not happen
@ -55,27 +84,26 @@ func computeDataKey(dataType byte, height uint64) string {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
type BlockManager struct { type BlockManager struct {
db *db_.LevelDB db *db_.LevelDB
sw *p2p.Switch sw *p2p.Switch
swEvents chan interface{} swEvents chan interface{}
state blockManagerState state *blockManagerState
dataStates map[string]*dataState // TODO: replace with CMap timeoutTimer *RepeatTimer
peerStates map[string]*peerState // TODO: replace with CMap quit chan struct{}
quit chan struct{} started uint32
started uint32 stopped uint32
stopped uint32
} }
func NewBlockManager(sw *p2p.Switch, db *db_.LevelDB) *BlockManager { func NewBlockManager(sw *p2p.Switch, db *db_.LevelDB) *BlockManager {
swEvents := make(chan interface{}) swEvents := make(chan interface{})
sw.AddEventListener("BlockManager.swEvents", swEvents) sw.AddEventListener("BlockManager.swEvents", swEvents)
bm := &BlockManager{ bm := &BlockManager{
db: db, db: db,
sw: sw, sw: sw,
swEvents: swEvents, swEvents: swEvents,
dataStates: make(map[string]*dataState), state: newBlockManagerState(),
peerStates: make(map[string]*peerState), timeoutTimer: NewRepeatTimer(timeoutRepeatTimerMS * time.Second),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
bm.loadState() bm.loadState()
return bm return bm
@ -85,6 +113,9 @@ func (bm *BlockManager) Start() {
if atomic.CompareAndSwapUint32(&bm.started, 0, 1) { if atomic.CompareAndSwapUint32(&bm.started, 0, 1) {
log.Info("Starting BlockManager") log.Info("Starting BlockManager")
go bm.switchEventsHandler() go bm.switchEventsHandler()
go bm.blocksInfoHandler()
go bm.blocksDataHandler()
go bm.requestTimeoutHandler()
} }
} }
@ -100,61 +131,32 @@ func (bm *BlockManager) Stop() {
// "request" is optional, it's the request response that supplied // "request" is optional, it's the request response that supplied
// the data. // the data.
func (bm *BlockManager) StoreBlock(block *Block, origin *dataRequest) { func (bm *BlockManager) StoreBlock(block *Block, origin *dataRequest) {
dataKey := computeDataKey(dataTypeBlock, uint64(block.Header.Height)) dataKey := newDataKey(dataTypeBlock, uint64(block.Header.Height))
// Remove dataState entry, we'll no longer request this.
_dataState := bm.dataStates[dataKey] // XXX actually save the block.
removedRequests := _dataState.removeRequestsForDataType(dataTypeBlock)
for _, request := range removedRequests { canceled, newHeight := bm.state.didGetDataFromPeer(dataKey, origin.peer)
// Notify peer that the request has been canceled.
if request.peer.Equals(origin.peer) { // Notify peers that the request has been canceled.
continue for _, request := range canceled {
} else { msg := &requestMessage{
// Send cancellation on blocksInfoCh channel key: dataKey,
msg := &requestMessage{ type_: requestTypeCanceled,
dataType: Byte(dataTypeBlock),
height: block.Header.Height,
canceled: Byte(0x01),
}
tm := p2p.TypedMessage{msgTypeRequest, msg}
request.peer.TrySend(blocksInfoCh, tm.Bytes())
} }
// Remove dataRequest from request.peer's peerState. tm := p2p.TypedMessage{msgTypeRequest, msg}
peerState := bm.peerStates[request.peer.Key] request.peer.TrySend(blocksInfoCh, tm.Bytes())
peerState.remoteDataRequest(request)
} }
// Update state
newContiguousHeight := bm.state.addData(dataTypeBlock, uint64(block.Header.Height))
// If we have new data that extends our contiguous range, then announce it. // If we have new data that extends our contiguous range, then announce it.
if newContiguousHeight { if newHeight {
bm.sw.Broadcast(blocksInfoCh, bm.state.stateMessage()) bm.sw.Broadcast(blocksInfoCh, bm.state.makeStateMessage())
} }
} }
func (bm *BlockManager) LoadData(dataType byte, height uint64) interface{} { func (bm *BlockManager) LoadBlock(height uint64) *Block {
panic("not yet implemented") panic("not yet implemented")
} }
func (bm *BlockManager) loadState() {
// Load the state
stateBytes := bm.db.Get(dbKeyState)
if stateBytes == nil {
log.Info("New BlockManager with no state")
} else {
err := json.Unmarshal(stateBytes, &bm.state)
if err != nil {
Panicf("Could not unmarshal state bytes: %X", stateBytes)
}
}
}
func (bm *BlockManager) saveState() {
stateBytes, err := json.Marshal(&bm.state)
if err != nil {
panic("Could not marshal state bytes")
}
bm.db.Set(dbKeyState, stateBytes)
}
// Handle peer new/done events // Handle peer new/done events
func (bm *BlockManager) switchEventsHandler() { func (bm *BlockManager) switchEventsHandler() {
for { for {
@ -165,8 +167,8 @@ func (bm *BlockManager) switchEventsHandler() {
switch swEvent.(type) { switch swEvent.(type) {
case p2p.SwitchEventNewPeer: case p2p.SwitchEventNewPeer:
event := swEvent.(p2p.SwitchEventNewPeer) event := swEvent.(p2p.SwitchEventNewPeer)
// Create entry in .peerStates // Create peerState for event.Peer
bm.peerStates[event.Peer.Key] = &peerState{} bm.state.createEntryForPeer(event.Peer)
// Share our state with event.Peer // Share our state with event.Peer
msg := &stateMessage{ msg := &stateMessage{
lastBlockHeight: UInt64(bm.state.lastBlockHeight), lastBlockHeight: UInt64(bm.state.lastBlockHeight),
@ -175,47 +177,122 @@ func (bm *BlockManager) switchEventsHandler() {
event.Peer.TrySend(blocksInfoCh, tm.Bytes()) event.Peer.TrySend(blocksInfoCh, tm.Bytes())
case p2p.SwitchEventDonePeer: case p2p.SwitchEventDonePeer:
event := swEvent.(p2p.SwitchEventDonePeer) event := swEvent.(p2p.SwitchEventDonePeer)
// Remove entry from .peerStates // Delete peerState for event.Peer
delete(bm.peerStates, event.Peer.Key) bm.state.deleteEntryForPeer(event.Peer)
default: default:
log.Warning("Unhandled switch event type") log.Warning("Unhandled switch event type")
} }
} }
} }
// Handle requests from the blocks channel // Handle requests/cancellations from the blocksInfo channel
func (bm *BlockManager) requestsHandler() { func (bm *BlockManager) blocksInfoHandler() {
for { for {
inMsg, ok := bm.sw.Receive(blocksInfoCh) inMsg, ok := bm.sw.Receive(blocksInfoCh)
if !ok { if !ok {
// Client has stopped break // Client has stopped
break
} }
// decode message
msg := decodeMessage(inMsg.Bytes) msg := decodeMessage(inMsg.Bytes)
log.Info("requestHandler received %v", msg) log.Info("blocksInfoHandler received %v", msg)
switch msg.(type) { switch msg.(type) {
case *stateMessage: case *stateMessage:
m := msg.(*stateMessage) m := msg.(*stateMessage)
peerState := bm.peerStates[inMsg.MConn.Peer.Key] peerState := bm.getPeerState(inMsg.MConn.Peer)
if peerState == nil { if peerState == nil {
continue // peer has since been disconnected. continue // peer has since been disconnected.
} }
peerState.applyStateMessage(m) newDataTypes := peerState.applyStateMessage(m)
// Consider requesting data. // Consider requesting data.
// 1. if has more validation and we want it // Does the peer claim to have something we want?
// 2. if has more txs and we want it FOR_LOOP:
// if peerState.estimatedCredit() >= averageBlock for _, newDataType := range newDataTypes {
// Are we already requesting too much data from peer?
// TODO: keep track of what we've requested from peer. if !peerState.canRequestMore() {
// TODO: keep track of from which peers we've requested data. break FOR_LOOP
// TODO: be fair. }
for _, wantedKey := range bm.state.nextWantedKeysForType(newDataType) {
if !peerState.hasData(wantedKey) {
break FOR_LOOP
}
// Request wantedKey from peer.
msg := &requestMessage{
key: dataKey,
type_: requestTypeFetch,
}
tm := p2p.TypedMessage{msgTypeRequest, msg}
sent := inMsg.MConn.Peer.TrySend(blocksInfoCh, tm.Bytes())
if sent {
// Log the request
request := &dataRequest{
peer: inMsg.MConn.Peer,
key: wantedKey,
time: time.Now(),
timeout: time.Now().Add(defaultRequestTimeout
}
bm.state.addDataRequest(request)
}
}
}
case *requestMessage: case *requestMessage:
// TODO: prevent abuse. m := msg.(*requestMessage)
switch m.type_ {
case requestTypeFetch:
// TODO: prevent abuse.
if !inMsg.MConn.Peer.CanSend(blocksDataCh) {
msg := &requestMessage{
key: dataKey,
type_: requestTypeTryAgain,
}
tm := p2p.TypedMessage{msgTypeRequest, msg}
sent := inMsg.MConn.Peer.TrySend(blocksInfoCh, tm.Bytes())
} else {
// If we don't have it, log and ignore.
block := bm.LoadBlock(m.key.height)
if block == nil {
log.Warning("Peer %v asked for nonexistant block %v", inMsg.MConn.Peer, m.key)
}
// Send the data.
msg := &dataMessage{
key: dataKey,
bytes: BinaryBytes(block),
}
tm := p2p.TypedMessage{msgTypeData, msg}
inMsg.MConn.Peer.TrySend(blocksDataCh, tm.Bytes())
}
case requestTypeCanceled:
// TODO: handle
// This requires modifying mconnection to keep track of item keys.
case requestTypeTryAgain:
// TODO: handle
default:
log.Warning("Invalid request: %v", m)
// Ignore.
}
default:
// should not happen
Panicf("Unknown message %v", msg)
// bm.sw.StopPeerForError(inMsg.MConn.Peer, errInvalidMessage)
}
}
// Cleanup
}
// Handle receiving data from the blocksData channel
func (bm *BlockManager) blocksDataHandler() {
for {
inMsg, ok := bm.sw.Receive(blocksDataCh)
if !ok {
break // Client has stopped
}
msg := decodeMessage(inMsg.Bytes)
log.Info("blocksDataHandler received %v", msg)
switch msg.(type) {
case *dataMessage: case *dataMessage:
// XXX move this to another channe
// See if we want the data. // See if we want the data.
// Validate data. // Validate data.
// Add to db. // Add to db.
@ -229,6 +306,17 @@ func (bm *BlockManager) requestsHandler() {
// Cleanup // Cleanup
} }
// Handle timed out requests by requesting from others.
func (bm *BlockManager) requestTimeoutHandler() {
for {
_, ok := <-bm.timeoutTimer
if !ok {
break
}
// Iterate over requests by time and handle timed out requests.
}
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// blockManagerState keeps track of which block parts are stored locally. // blockManagerState keeps track of which block parts are stored locally.
@ -237,9 +325,56 @@ type blockManagerState struct {
mtx sync.Mutex mtx sync.Mutex
lastBlockHeight uint64 // Last contiguous header height lastBlockHeight uint64 // Last contiguous header height
otherBlockHeights map[uint64]struct{} otherBlockHeights map[uint64]struct{}
requestsByKey map[dataKey][]*dataRequest
requestsByTimeout *Heap // Could be a linkedlist, but more flexible.
peerStates map[string]*peerState
} }
func (bms blockManagerState) stateMessage() *stateMessage { func newBlockManagerState() *blockManagerState {
return &blockManagerState{
requestsByKey: make(map[dataKey][]*dataRequest),
requestsByTimeout: NewHeap(),
peerStates: make(map[string]*peerState),
}
}
type blockManagerStateJSON struct {
LastBlockHeight uint64 // Last contiguous header height
OtherBlockHeights map[uint64]struct{}
}
func (bms *BlockManagerState) loadState(db _db.LevelDB) {
bms.mtx.Lock()
defer bms.mtx.Unlock()
stateBytes := db.Get(dbKeyState)
if stateBytes == nil {
log.Info("New BlockManager with no state")
} else {
bmsJSON := &blockManagerStateJSON{}
err := json.Unmarshal(stateBytes, bmsJSON)
if err != nil {
Panicf("Could not unmarshal state bytes: %X", stateBytes)
}
bms.lastBlockHeight = bmsJSON.LastBlockHeight
bms.otherBlockHeights = bmsJSON.OtherBlockHeights
}
}
func (bms *BlockManagerState) saveState(db _db.LevelDB) {
bms.mtx.Lock()
defer bms.mtx.Unlock()
bmsJSON := &blockManagerStateJSON{
LastBlockHeight: bms.lastBlockHeight,
OtherBlockHeights: bms.otherBlockHeights,
}
stateBytes, err := json.Marshal(bmsJSON)
if err != nil {
panic("Could not marshal state bytes")
}
db.Set(dbKeyState, stateBytes)
}
func (bms *blockManagerState) makeStateMessage() *stateMessage {
bms.mtx.Lock() bms.mtx.Lock()
defer bms.mtx.Unlock() defer bms.mtx.Unlock()
return &stateMessage{ return &stateMessage{
@ -247,12 +382,43 @@ func (bms blockManagerState) stateMessage() *stateMessage {
} }
} }
func (bms blockManagerState) addData(dataType byte, height uint64) bool { func (bms *blockManagerState) createEntryForPeer(peer *peer) {
bms.mtx.Lock() bms.mtx.Lock()
defer bms.mtx.Unlock() defer bms.mtx.Unlock()
if dataType != dataTypeBlock { bms.peerStates[peer.Key] = &peerState{peer: peer}
Panicf("Unknown datatype %X", dataType) }
func (bms *blockManagerState) deleteEntryForPeer(peer *peer) {
bms.mtx.Lock()
defer bms.mtx.Unlock()
delete(bms.peerStates, peer.Key)
}
func (bms *blockManagerState) getPeerState(peer *Peer) {
bms.mtx.Lock()
defer bms.mtx.Unlock()
return bms.peerStates[peer.Key]
}
func (bms *blockManagerState) addDataRequest(newRequest *dataRequest) {
ps.mtx.Lock()
bms.requestsByKey[newRequest.key] = append(bms.requestsByKey[newRequest.key], newRequest)
bms.requestsByTimeout.Push(newRequest) // XXX
peerState, ok := bms.peerStates[newRequest.peer.Key]
ps.mtx.Unlock()
if ok {
peerState.addDataRequest(newRequest)
} }
}
func (bms *blockManagerState) didGetDataFromPeer(key dataKey, peer *p2p.Peer) (canceled []*dataRequest, newHeight bool) {
bms.mtx.Lock()
defer bms.mtx.Unlock()
if key.dataType != dataTypeBlock {
Panicf("Unknown datatype %X", key.dataType)
}
// Adjust lastBlockHeight/otherBlockHeights.
height := key.height
if bms.lastBlockHeight == height-1 { if bms.lastBlockHeight == height-1 {
bms.lastBlockHeight = height bms.lastBlockHeight = height
height++ height++
@ -261,64 +427,126 @@ func (bms blockManagerState) addData(dataType byte, height uint64) bool {
bms.lastBlockHeight = height bms.lastBlockHeight = height
height++ height++
} }
return true newHeight = true
}
// Remove dataRequests
requests := bms.requestsByKey[key]
for _, request := range requests {
peerState, ok := bms.peerStates[peer.Key]
if ok {
peerState.removeDataRequest(request)
}
if request.peer == peer {
continue
}
canceled = append(canceled, request)
}
delete(bms.requestsByKey, key)
return canceled, newHeight
}
// Returns at most maxRequestAheadBlock dataKeys that we don't yet have &
// aren't already requesting from maxRequestsPerData peers.
func (bms *blockManagerState) nextWantedKeysForType(dataType byte) []dataKey {
bms.mtx.Lock()
defer bms.mtx.Unlock()
var keys []dataKey
switch dataType {
case dataTypeBlock:
for h := bms.lastBlockHeight + 1; h <= bms.lastBlockHeight+maxRequestAheadBlock; h++ {
if _, ok := bms.otherBlockHeights[h]; !ok {
key := newDataKey(dataTypeBlock, h)
if len(bms.requestsByKey[key]) < maxRequestsPerData {
keys = append(keys, key)
}
}
}
return keys
default:
Panicf("Unknown datatype %X", dataType)
return
} }
return false
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// dataRequest keeps track of each request for a given peice of data & peer. // dataRequest keeps track of each request for a given peice of data & peer.
type dataRequest struct { type dataRequest struct {
peer *p2p.Peer peer *p2p.Peer
dataType byte key dataKey
height uint64 time time.Time
time time.Time // XXX keep track of timeouts. timeout time.Time
}
//-----------------------------------------------------------------------------
// dataState keeps track of all requests for a given piece of data.
type dataState struct {
mtx sync.Mutex
requests []*dataRequest
}
func (ds *dataState) removeRequestsForDataType(dataType byte) []*dataRequest {
ds.mtx.Lock()
defer ds.mtx.Lock()
requests := []*dataRequest{}
filtered := []*dataRequest{}
for _, request := range ds.requests {
if request.dataType == dataType {
filtered = append(filtered, request)
} else {
requests = append(requests, request)
}
}
ds.requests = requests
return filtered
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
type peerState struct { type peerState struct {
mtx sync.Mutex mtx sync.Mutex
lastBlockHeight uint64 // Last contiguous header height peer *Peer
lastBlockHeight uint64 // Last contiguous header height
requests []*dataRequest // Active requests
// XXX we need to
} }
func (ps *peerState) applyStateMessage(msg *stateMessage) { // Returns which dataTypes are new as declared by stateMessage.
func (ps *peerState) applyStateMessage(msg *stateMessage) []byte {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
ps.lastBlockHeight = uint64(msg.lastBlockHeight) var newTypes []byte
if uint64(msg.lastBlockHeight) > ps.lastBlockHeight {
newTypes = append(newTypes, dataTypeBlock)
ps.lastBlockHeight = uint64(msg.lastBlockHeight)
} else {
log.Info("Strange, peer declares a regression of %X", dataTypeBlock)
}
return newTypes
} }
func (ps *peerState) addDataRequest(request *dataRequest) { func (ps *peerState) hasData(key dataKey) bool {
// TODO: keep track of dataRequests ps.mtx.Lock()
defer ps.mtx.Unlock()
switch key.dataType {
case dataTypeBlock:
return key.height <= ps.lastBlockHeight
default:
Panicf("Unknown datatype %X", dataType)
return false // should not happen
}
} }
func (ps *peerState) remoteDataRequest(request *dataRequest) { func (ps *peerState) addDataRequest(newRequest *dataRequest) {
// TODO: keep track of dataRequests, and remove them here. ps.mtx.Lock()
defer ps.mtx.Unlock()
for _, request := range ps.requests {
if request.key == newRequest.key {
return
}
}
ps.requests = append(ps.requests, newRequest)
return newRequest
}
func (ps *peerState) remoteDataRequest(key dataKey) bool {
ps.mtx.Lock()
defer ps.mtx.Unlock()
filtered := []*dataRequest{}
removed := false
for _, request := range ps.requests {
if request.key == key {
removed = true
} else {
filtered = append(filtered, request)
}
}
ps.requests = filtered
return removed
}
func (ps *peerState) canRequestMore() bool {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return len(ps.requests) < maxRequestsPerPeer
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -367,32 +595,40 @@ func (m *stateMessage) String() string {
A requestMessage requests a block and/or header at a given height. A requestMessage requests a block and/or header at a given height.
*/ */
type requestMessage struct { type requestMessage struct {
dataType Byte key dataKey
height UInt64 type_ Byte
canceled Byte // 0x00 if request, 0x01 if cancellation
} }
const (
requestTypeFetch = Byte(0x01)
requestTypeCanceled = Byte(0x02)
requestTypeTryAgain = Byte(0x03)
)
func readRequestMessage(r io.Reader) *requestMessage { func readRequestMessage(r io.Reader) *requestMessage {
return &requestMessage{ return &requestMessage{
dataType: ReadByte(r), key: ReadDataKey(r),
height: ReadUInt64(r), type_: ReadByte(r),
canceled: ReadByte(r),
} }
} }
func (m *requestMessage) WriteTo(w io.Writer) (n int64, err error) { func (m *requestMessage) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteTo(msgTypeRequest, w, n, err) n, err = WriteTo(msgTypeRequest, w, n, err)
n, err = WriteTo(m.dataType, w, n, err) n, err = WriteTo(m.key, w, n, err)
n, err = WriteTo(m.height, w, n, err) n, err = WriteTo(m.type_, w, n, err)
n, err = WriteTo(m.canceled, w, n, err)
return return
} }
func (m *requestMessage) String() string { func (m *requestMessage) String() string {
if m.canceled == Byte(0x01) { switch m.type_ {
return fmt.Sprintf("[Cancellation %X@%v]", m.dataType, m.height) case requestTypeByte:
} else { return fmt.Sprintf("[Request(fetch) %v]", m.key)
return fmt.Sprintf("[Request %X@%v]", m.dataType, m.height) case requestTypeCanceled:
return fmt.Sprintf("[Request(canceled) %v]", m.key)
case requestTypeTryAgain:
return fmt.Sprintf("[Request(tryagain) %v]", m.key)
default:
return fmt.Sprintf("[Request(invalid) %v]", m.key)
} }
} }
@ -401,30 +637,24 @@ A dataMessage contains block data, maybe requested.
The data can be a Validation, Txs, or whole Block object. The data can be a Validation, Txs, or whole Block object.
*/ */
type dataMessage struct { type dataMessage struct {
dataType Byte key dataKey
height UInt64 bytes ByteSlice
bytes ByteSlice
} }
func readDataMessage(r io.Reader) *dataMessage { func readDataMessage(r io.Reader) *dataMessage {
dataType := ReadByte(r)
height := ReadUInt64(r)
bytes := ReadByteSlice(r)
return &dataMessage{ return &dataMessage{
dataType: dataType, key: readDataKey(r),
height: height, bytes: readByteSlice(r),
bytes: bytes,
} }
} }
func (m *dataMessage) WriteTo(w io.Writer) (n int64, err error) { func (m *dataMessage) WriteTo(w io.Writer) (n int64, err error) {
n, err = WriteTo(msgTypeData, w, n, err) n, err = WriteTo(msgTypeData, w, n, err)
n, err = WriteTo(m.dataType, w, n, err) n, err = WriteTo(m.key, w, n, err)
n, err = WriteTo(m.height, w, n, err)
n, err = WriteTo(m.bytes, w, n, err) n, err = WriteTo(m.bytes, w, n, err)
return return
} }
func (m *dataMessage) String() string { func (m *dataMessage) String() string {
return fmt.Sprintf("[Data %X@%v]", m.dataType, m.height) return fmt.Sprintf("[Data %v]", m.key)
} }

View File

@ -12,11 +12,11 @@ func NewHeap() *Heap {
return &Heap{pq: make([]*pqItem, 0)} return &Heap{pq: make([]*pqItem, 0)}
} }
func (h *Heap) Len() int { func (h *Heap) Len() int64 {
return len(h.pq) return len(h.pq)
} }
func (h *Heap) Push(value interface{}, priority int) { func (h *Heap) Push(value interface{}, priority int64) {
heap.Push(&h.pq, &pqItem{value: value, priority: priority}) heap.Push(&h.pq, &pqItem{value: value, priority: priority})
} }
@ -44,7 +44,7 @@ func main() {
type pqItem struct { type pqItem struct {
value interface{} value interface{}
priority int priority int64
index int index int
} }
@ -78,7 +78,7 @@ func (pq *priorityQueue) Pop() interface{} {
return item return item
} }
func (pq *priorityQueue) Update(item *pqItem, value interface{}, priority int) { func (pq *priorityQueue) Update(item *pqItem, value interface{}, priority int64) {
heap.Remove(pq, item.index) heap.Remove(pq, item.index)
item.value = value item.value = value
item.priority = priority item.priority = priority

View File

@ -28,6 +28,13 @@ func (db *LevelDB) Set(key []byte, value []byte) {
} }
func (db *LevelDB) Get(key []byte) []byte { func (db *LevelDB) Get(key []byte) []byte {
batch := new(leveldb.Batch)
batch.Put([]byte("foo"), []byte("value"))
batch.Put([]byte("bar"), []byte("another value"))
batch.Delete([]byte("baz"))
err = db.Write(batch, nil)
res, err := db.db.Get(key, nil) res, err := db.db.Get(key, nil)
if err != nil { if err != nil {
panic(err) panic(err)
@ -35,6 +42,9 @@ func (db *LevelDB) Get(key []byte) []byte {
return res return res
} }
func (db *LevelDB) GetRange(key []byte, start, end int) []byte {
}
func (db *LevelDB) Delete(key []byte) { func (db *LevelDB) Delete(key []byte) {
err := db.db.Delete(key, nil) err := db.db.Delete(key, nil)
if err != nil { if err != nil {

View File

@ -21,6 +21,9 @@ func (db *MemDB) Get(key []byte) []byte {
return db.db[string(key)] return db.db[string(key)]
} }
func (db *MemDB) GetRange(key []byte, start, end int) []byte {
}
func (db *MemDB) Delete(key []byte) { func (db *MemDB) Delete(key []byte) {
delete(db.db, string(key)) delete(db.db, string(key))
} }

View File

@ -41,7 +41,7 @@ type MConnection struct {
sendRate int64 sendRate int64
recvRate int64 recvRate int64
flushTimer *ThrottleTimer // flush writes as necessary but throttled. flushTimer *ThrottleTimer // flush writes as necessary but throttled.
canSend chan struct{} send chan struct{}
quit chan struct{} quit chan struct{}
pingTimer *RepeatTimer // send pings periodically pingTimer *RepeatTimer // send pings periodically
pong chan struct{} pong chan struct{}
@ -69,7 +69,7 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onError func(in
sendRate: defaultSendRate, sendRate: defaultSendRate,
recvRate: defaultRecvRate, recvRate: defaultRecvRate,
flushTimer: NewThrottleTimer(flushThrottleMS * time.Millisecond), flushTimer: NewThrottleTimer(flushThrottleMS * time.Millisecond),
canSend: make(chan struct{}, 1), send: make(chan struct{}, 1),
quit: make(chan struct{}), quit: make(chan struct{}),
pingTimer: NewRepeatTimer(pingTimeoutMinutes * time.Minute), pingTimer: NewRepeatTimer(pingTimeoutMinutes * time.Minute),
pong: make(chan struct{}), pong: make(chan struct{}),
@ -150,6 +150,10 @@ func (c *MConnection) stopForError(r interface{}) {
// Queues a message to be sent to channel. // Queues a message to be sent to channel.
func (c *MConnection) Send(chId byte, bytes ByteSlice) bool { func (c *MConnection) Send(chId byte, bytes ByteSlice) bool {
if atomic.LoadUint32(&c.stopped) == 1 {
return false
}
// Send message to channel. // Send message to channel.
channel, ok := c.channelsIdx[chId] channel, ok := c.channelsIdx[chId]
if !ok { if !ok {
@ -161,7 +165,7 @@ func (c *MConnection) Send(chId byte, bytes ByteSlice) bool {
// Wake up sendHandler if necessary // Wake up sendHandler if necessary
select { select {
case c.canSend <- struct{}{}: case c.send <- struct{}{}:
default: default:
} }
@ -171,6 +175,10 @@ func (c *MConnection) Send(chId byte, bytes ByteSlice) bool {
// Queues a message to be sent to channel. // Queues a message to be sent to channel.
// Nonblocking, returns true if successful. // Nonblocking, returns true if successful.
func (c *MConnection) TrySend(chId byte, bytes ByteSlice) bool { func (c *MConnection) TrySend(chId byte, bytes ByteSlice) bool {
if atomic.LoadUint32(&c.stopped) == 1 {
return false
}
// Send message to channel. // Send message to channel.
channel, ok := c.channelsIdx[chId] channel, ok := c.channelsIdx[chId]
if !ok { if !ok {
@ -182,7 +190,7 @@ func (c *MConnection) TrySend(chId byte, bytes ByteSlice) bool {
if ok { if ok {
// Wake up sendHandler if necessary // Wake up sendHandler if necessary
select { select {
case c.canSend <- struct{}{}: case c.send <- struct{}{}:
default: default:
} }
} }
@ -190,6 +198,19 @@ func (c *MConnection) TrySend(chId byte, bytes ByteSlice) bool {
return ok return ok
} }
func (c *MConnection) CanSend(chId byte) bool {
if atomic.LoadUint32(&c.stopped) == 1 {
return false
}
channel, ok := c.channelsIdx[chId]
if !ok {
log.Error("Unknown channel %X", chId)
return 0
}
return channel.canSend()
}
// sendHandler polls for packets to send from channels. // sendHandler polls for packets to send from channels.
func (c *MConnection) sendHandler() { func (c *MConnection) sendHandler() {
defer c._recover() defer c._recover()
@ -218,13 +239,13 @@ FOR_LOOP:
c.flush() c.flush()
case <-c.quit: case <-c.quit:
break FOR_LOOP break FOR_LOOP
case <-c.canSend: case <-c.send:
// Send some packets // Send some packets
eof := c.sendSomePackets() eof := c.sendSomePackets()
if !eof { if !eof {
// Keep sendHandler awake. // Keep sendHandler awake.
select { select {
case c.canSend <- struct{}{}: case c.send <- struct{}{}:
default: default:
} }
} }
@ -384,15 +405,16 @@ type ChannelDescriptor struct {
// TODO: lowercase. // TODO: lowercase.
// NOTE: not goroutine-safe. // NOTE: not goroutine-safe.
type Channel struct { type Channel struct {
conn *MConnection conn *MConnection
desc *ChannelDescriptor desc *ChannelDescriptor
id byte id byte
recvQueue chan InboundBytes recvQueue chan InboundBytes
sendQueue chan ByteSlice sendQueue chan ByteSlice
recving ByteSlice sendQueueSize uint32
sending ByteSlice recving ByteSlice
priority uint sending ByteSlice
recentlySent int64 // exponential moving average priority uint
recentlySent int64 // exponential moving average
} }
func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel { func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
@ -411,22 +433,39 @@ func newChannel(conn *MConnection, desc *ChannelDescriptor) *Channel {
} }
// Queues message to send to this channel. // Queues message to send to this channel.
// Goroutine-safe
func (ch *Channel) sendBytes(bytes ByteSlice) { func (ch *Channel) sendBytes(bytes ByteSlice) {
ch.sendQueue <- bytes ch.sendQueue <- bytes
atomic.AddUint32(&ch.sendQueueSize, 1)
} }
// Queues message to send to this channel. // Queues message to send to this channel.
// Nonblocking, returns true if successful. // Nonblocking, returns true if successful.
// Goroutine-safe
func (ch *Channel) trySendBytes(bytes ByteSlice) bool { func (ch *Channel) trySendBytes(bytes ByteSlice) bool {
select { select {
case ch.sendQueue <- bytes: case ch.sendQueue <- bytes:
atomic.AddUint32(&ch.sendQueueSize, 1)
return true return true
default: default:
return false return false
} }
} }
// Goroutine-safe
func (ch *Channel) sendQueueSize() (size int) {
return int(atomic.LoadUint32(&ch.sendQueueSize))
}
// Goroutine-safe
// Use only as a heuristic.
func (ch *Channel) canSend() bool {
return ch.sendQueueSize() < ch.desc.SendQueueCapacity
}
// Returns true if any packets are pending to be sent. // Returns true if any packets are pending to be sent.
// Call before calling nextPacket()
// Goroutine-safe
func (ch *Channel) sendPending() bool { func (ch *Channel) sendPending() bool {
if len(ch.sending) == 0 { if len(ch.sending) == 0 {
if len(ch.sendQueue) == 0 { if len(ch.sendQueue) == 0 {
@ -438,6 +477,7 @@ func (ch *Channel) sendPending() bool {
} }
// Creates a new packet to send. // Creates a new packet to send.
// Not goroutine-safe
func (ch *Channel) nextPacket() packet { func (ch *Channel) nextPacket() packet {
packet := packet{} packet := packet{}
packet.ChannelId = Byte(ch.id) packet.ChannelId = Byte(ch.id)
@ -445,6 +485,7 @@ func (ch *Channel) nextPacket() packet {
if len(ch.sending) <= maxPacketSize { if len(ch.sending) <= maxPacketSize {
packet.EOF = Byte(0x01) packet.EOF = Byte(0x01)
ch.sending = nil ch.sending = nil
atomic.AddUint32(&ch.sendQueueSize, ^uint32(0)) // decrement sendQueueSize
} else { } else {
packet.EOF = Byte(0x00) packet.EOF = Byte(0x00)
ch.sending = ch.sending[MinInt(maxPacketSize, len(ch.sending)):] ch.sending = ch.sending[MinInt(maxPacketSize, len(ch.sending)):]
@ -453,6 +494,7 @@ func (ch *Channel) nextPacket() packet {
} }
// Writes next packet to w. // Writes next packet to w.
// Not goroutine-safe
func (ch *Channel) writePacketTo(w io.Writer) (n int64, err error) { func (ch *Channel) writePacketTo(w io.Writer) (n int64, err error) {
packet := ch.nextPacket() packet := ch.nextPacket()
n, err = WriteTo(packetTypeMessage, w, n, err) n, err = WriteTo(packetTypeMessage, w, n, err)
@ -464,6 +506,7 @@ func (ch *Channel) writePacketTo(w io.Writer) (n int64, err error) {
} }
// Handles incoming packets. // Handles incoming packets.
// Not goroutine-safe
func (ch *Channel) recvPacket(pkt packet) { func (ch *Channel) recvPacket(pkt packet) {
ch.recving = append(ch.recving, pkt.Bytes...) ch.recving = append(ch.recving, pkt.Bytes...)
if pkt.EOF == Byte(0x01) { if pkt.EOF == Byte(0x01) {
@ -473,6 +516,7 @@ func (ch *Channel) recvPacket(pkt packet) {
} }
// Call this periodically to update stats for throttling purposes. // Call this periodically to update stats for throttling purposes.
// Not goroutine-safe
func (ch *Channel) updateStats() { func (ch *Channel) updateStats() {
// Exponential decay of stats. // Exponential decay of stats.
// TODO: optimize. // TODO: optimize.

View File

@ -55,13 +55,6 @@ func (p *Peer) IsOutbound() bool {
return p.outbound return p.outbound
} }
func (p *Peer) TrySend(chId byte, bytes ByteSlice) bool {
if atomic.LoadUint32(&p.stopped) == 1 {
return false
}
return p.mconn.TrySend(chId, bytes)
}
func (p *Peer) Send(chId byte, bytes ByteSlice) bool { func (p *Peer) Send(chId byte, bytes ByteSlice) bool {
if atomic.LoadUint32(&p.stopped) == 1 { if atomic.LoadUint32(&p.stopped) == 1 {
return false return false
@ -69,8 +62,22 @@ func (p *Peer) Send(chId byte, bytes ByteSlice) bool {
return p.mconn.Send(chId, bytes) return p.mconn.Send(chId, bytes)
} }
func (p *Peer) TrySend(chId byte, bytes ByteSlice) bool {
if atomic.LoadUint32(&p.stopped) == 1 {
return false
}
return p.mconn.TrySend(chId, bytes)
}
func (o *Peer) CanSend(chId byte) int {
if atomic.LoadUint32(&p.stopped) == 1 {
return 0
}
return p.mconn.CanSend(chId)
}
func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { func (p *Peer) WriteTo(w io.Writer) (n int64, err error) {
return p.mconn.RemoteAddress.WriteTo(w) return String(p.Key).WriteTo(w)
} }
func (p *Peer) String() string { func (p *Peer) String() string {
@ -82,5 +89,5 @@ func (p *Peer) String() string {
} }
func (p *Peer) Equals(other *Peer) bool { func (p *Peer) Equals(other *Peer) bool {
return p.mconn.RemoteAddress.Equals(other.mconn.RemoteAddress) return p.Key == other.Key
} }

411
sim/bench.go Normal file
View File

@ -0,0 +1,411 @@
package main
import (
"container/heap"
"fmt"
"math/rand"
)
const seed = 0
const numNodes = 50000 // Total number of nodes to simulate
const minNumPeers = 10 // Each node should be connected to at least this many peers
const maxNumPeers = 15 // ... and at most this many
const latencyMS = int32(500) // One way packet latency
const partTxMS = int32(10) // Transmission time per peer of 4KB of data.
const sendQueueCapacity = 100 // Amount of messages to queue between peers.
func init() {
rand.Seed(seed)
}
//-----------------------------------------------------------------------------
type Peer struct {
node *Node // Pointer to node
sent int32 // Time of last packet send, including transmit time.
remote int // SomeNode.peers[x].node.peers[remote].node is SomeNode for all x.
parts []byte // [32]byte{} bitarray of received block pieces.
}
// Send a data event to the peer, or return false if queue is "full".
// Depending on how many event packets are "queued" for peer,
// the actual recvTime may be adjusted to be later.
func (p *Peer) sendEventData(event EventData) bool {
desiredRecvTime := event.RecvTime()
minRecvTime := p.sent + partTxMS + latencyMS
if desiredRecvTime >= minRecvTime {
p.node.sendEvent(event)
p.sent += partTxMS
return true
} else {
if (minRecvTime-desiredRecvTime)/partTxMS > sendQueueCapacity {
return false
} else {
event.SetRecvTime(minRecvTime) // Adjust recvTime
p.node.sendEvent(event)
p.sent += partTxMS
return true
}
}
}
// Since EventPart events are much smaller, we don't consider the transmit time,
// and assume that the sendQueue is always free.
func (p *Peer) sendEventParts(event EventParts) {
p.node.sendEvent(event)
}
// Does the peer's .parts (as received by an EventParts event) contain part?
func (p *Peer) knownToHave(part uint8) bool {
return p.parts[part/8]&(1<<(part%8)) > 0
}
//-----------------------------------------------------------------------------
type Node struct {
index int
peers []*Peer
parts []byte
events *Heap
}
func (n *Node) sendEvent(event Event) {
n.events.Push(event, event.RecvTime())
}
func (n *Node) recvEvent() Event {
return n.events.Pop().(Event)
}
func (n *Node) receive(part uint8) bool {
x := n.parts[part/8]
nx := x | (1 << (part % 8))
if x == nx {
return false
} else {
n.parts[part/8] = nx
return true
}
}
// returns false if already connected, or remote node has too many connections.
func (n *Node) canConnectTo(node *Node) bool {
if len(n.peers) > maxNumPeers {
return false
}
for _, peer := range n.peers {
if peer.node == node {
return false
}
}
return true
}
func (n *Node) isFull() bool {
for _, part := range n.parts {
if part != byte(0xff) {
return false
}
}
return true
}
func (n *Node) String() string {
return fmt.Sprintf("{N:%d}", n.index)
}
//-----------------------------------------------------------------------------
type Event interface {
RecvTime() int32
SetRecvTime(int32)
}
type EventData struct {
time int32 // time of receipt.
part uint8
}
func (e EventData) RecvTime() int32 {
return e.time
}
func (e EventData) SetRecvTime(time int32) {
e.time = time
}
func (e EventData) String() string {
return fmt.Sprintf("[%d:%d]", e.time, e.part)
}
type EventParts struct {
time int32 // time of receipt.
src int // src node's peer index on destination node.
parts []byte
}
func (e EventParts) RecvTime() int32 {
return e.time
}
func (e EventParts) SetRecvTime(time int32) {
e.time = time
}
func (e EventParts) String() string {
return fmt.Sprintf("[%d:%d:%d]", e.time, e.src, e.parts)
}
//-----------------------------------------------------------------------------
func createNetwork() []*Node {
nodes := make([]*Node, numNodes)
for i := 0; i < numNodes; i++ {
n := &Node{
index: i,
peers: []*Peer{},
parts: make([]byte, 32),
events: NewHeap(),
}
nodes[i] = n
}
for i := 0; i < numNodes; i++ {
n := nodes[i]
for j := 0; j < minNumPeers; j++ {
if len(n.peers) > j {
// Already set, continue
continue
}
pidx := rand.Intn(numNodes)
for !n.canConnectTo(nodes[pidx]) {
pidx = rand.Intn(numNodes)
}
// connect to nodes[pidx]
remote := nodes[pidx]
remote_j := len(remote.peers)
n.peers = append(n.peers, &Peer{node: remote, remote: remote_j, parts: make([]byte, 32)})
remote.peers = append(remote.peers, &Peer{node: n, remote: j, parts: make([]byte, 32)})
}
}
return nodes
}
func printNodes(nodes []*Node) {
for _, node := range nodes {
peerStr := ""
for _, peer := range node.peers {
peerStr += fmt.Sprintf(" %v", peer.node.index)
}
fmt.Printf("[%v] peers: %v\n", node.index, peerStr)
}
}
func countFull(nodes []*Node) (fullCount int) {
for _, node := range nodes {
if node.isFull() {
fullCount += 1
}
}
return fullCount
}
func main() {
// Global vars
nodes := createNetwork()
timeMS := int32(0)
proposer := nodes[0]
for i := 0; i < 32; i++ {
proposer.parts[i] = byte(0xff)
}
//printNodes(nodes[:])
// The proposer sends parts to all of its peers.
for i := 0; i < len(proposer.peers); i++ {
timeMS := int32(0) // scoped
peer := proposer.peers[i]
for j := 0; j < 256; j++ {
// Send each part to a peer, but each peer starts at a different offset.
part := uint8((j + i*25) % 256)
recvTime := timeMS + latencyMS + partTxMS
event := EventData{
time: recvTime,
part: part,
}
peer.sendEventData(event)
timeMS += partTxMS
}
}
// Run simulation
for {
// Lets run the simulation for each user until endTimeMS
// We use latencyMS/2 since causality has at least this much lag.
endTimeMS := timeMS + latencyMS/2
fmt.Printf("simulating until %v\n", endTimeMS)
// Print out the network for debugging
if true {
for i := 40000; i < 40050; i++ {
node := nodes[i]
fmt.Printf("[%v] parts: %X\n", node.index, node.parts)
}
}
for _, node := range nodes {
// Iterate over the events of this node until event.time >= endTimeMS
for {
_event, ok := node.events.Peek().(Event)
if !ok || _event.RecvTime() >= endTimeMS {
break
} else {
node.events.Pop()
}
switch _event.(type) {
case EventData:
event := _event.(EventData)
// Process this event
if !node.receive(event.part) {
// Already has this part, ignore this event.
continue
}
// Let's iterate over peers & see which needs this piece.
recvTime := event.time + latencyMS + partTxMS
for _, peer := range node.peers {
if peer.knownToHave(event.part) {
continue
}
peer.sendEventData(EventData{
time: recvTime,
part: event.part,
})
}
case EventParts:
event := _event.(EventParts)
node.peers[event.src].parts = event.parts
}
}
}
// If network is full, quit.
if countFull(nodes) == numNodes {
fmt.Printf("Done! took %v ms", timeMS)
break
}
// Lets increment the timeMS now
timeMS += latencyMS / 2
// Send EventParts rather frequently. It's cheap.
for _, node := range nodes {
for _, peer := range node.peers {
peer.sendEventParts(EventParts{
time: timeMS + latencyMS,
src: peer.remote,
parts: node.parts,
})
}
newParts := make([]byte, 32)
copy(newParts, node.parts)
node.parts = newParts
}
}
}
// ----------------------------------------------------------------------------
type Heap struct {
pq priorityQueue
}
func NewHeap() *Heap {
return &Heap{pq: make([]*pqItem, 0)}
}
func (h *Heap) Len() int {
return len(h.pq)
}
func (h *Heap) Peek() interface{} {
if len(h.pq) == 0 {
return nil
}
return h.pq[0].value
}
func (h *Heap) Push(value interface{}, priority int32) {
heap.Push(&h.pq, &pqItem{value: value, priority: priority})
}
func (h *Heap) Pop() interface{} {
item := heap.Pop(&h.pq).(*pqItem)
return item.value
}
/*
func main() {
h := NewHeap()
h.Push(String("msg1"), 1)
h.Push(String("msg3"), 3)
h.Push(String("msg2"), 2)
fmt.Println(h.Pop())
fmt.Println(h.Pop())
fmt.Println(h.Pop())
}
*/
///////////////////////
// From: http://golang.org/pkg/container/heap/#example__priorityQueue
type pqItem struct {
value interface{}
priority int32
index int
}
type priorityQueue []*pqItem
func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Less(i, j int) bool {
return pq[i].priority < pq[j].priority
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*pqItem)
item.index = n
*pq = append(*pq, item)
}
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
func (pq *priorityQueue) Update(item *pqItem, value interface{}, priority int32) {
heap.Remove(pq, item.index)
item.value = value
item.priority = priority
heap.Push(pq, item)
}