Optimizing blockchain reactor.

Should be paired with https://github.com/tendermint/iavl/pull/65.
This commit is contained in:
Jae Kwon
2018-06-21 01:57:35 -07:00
parent f62d6651e3
commit 8128627f08
8 changed files with 102 additions and 96 deletions

16
Gopkg.lock generated
View File

@ -130,7 +130,6 @@
version = "v1.0" version = "v1.0"
[[projects]] [[projects]]
branch = "master"
name = "github.com/jmhodges/levigo" name = "github.com/jmhodges/levigo"
packages = ["."] packages = ["."]
revision = "c42d9e0ca023e2198120196f842701bb4c55d7b9" revision = "c42d9e0ca023e2198120196f842701bb4c55d7b9"
@ -286,19 +285,6 @@
] ]
revision = "e2150783cd35f5b607daca48afd8c57ec54cc995" revision = "e2150783cd35f5b607daca48afd8c57ec54cc995"
[[projects]]
name = "github.com/tendermint/abci"
packages = [
"client",
"example/code",
"example/counter",
"example/kvstore",
"server",
"types"
]
revision = "198dccf0ddfd1bb176f87657e3286a05a6ed9540"
version = "v0.12.0"
[[projects]] [[projects]]
branch = "master" branch = "master"
name = "github.com/tendermint/ed25519" name = "github.com/tendermint/ed25519"
@ -435,6 +421,6 @@
[solve-meta] [solve-meta]
analyzer-name = "dep" analyzer-name = "dep"
analyzer-version = 1 analyzer-version = 1
inputs-digest = "d17038089dd6383ff5028229d4026bb92f5c7adc7e9c1cd52584237e2e5fd431" inputs-digest = "400de835ace8c8a69747afd675d1952daf750c251a02b9dac82a3c9dce4f65a8"
solver-name = "gps-cdcl" solver-name = "gps-cdcl"
solver-version = 1 solver-version = 1

View File

@ -86,6 +86,10 @@
name = "google.golang.org/genproto" name = "google.golang.org/genproto"
revision = "7fd901a49ba6a7f87732eb344f6e3c5b19d1b200" revision = "7fd901a49ba6a7f87732eb344f6e3c5b19d1b200"
[[override]]
name = "github.com/jmhodges/levigo"
revision = "c42d9e0ca023e2198120196f842701bb4c55d7b9"
[prune] [prune]
go-tests = true go-tests = true
unused-packages = true unused-packages = true

View File

@ -2,7 +2,7 @@ GOTOOLS = \
github.com/golang/dep/cmd/dep \ github.com/golang/dep/cmd/dep \
gopkg.in/alecthomas/gometalinter.v2 gopkg.in/alecthomas/gometalinter.v2
PACKAGES=$(shell go list ./... | grep -v '/vendor/') PACKAGES=$(shell go list ./... | grep -v '/vendor/')
BUILD_TAGS?=tendermint BUILD_TAGS?='tendermint'
BUILD_FLAGS = -ldflags "-X github.com/tendermint/tendermint/version.GitCommit=`git rev-parse --short=8 HEAD`" BUILD_FLAGS = -ldflags "-X github.com/tendermint/tendermint/version.GitCommit=`git rev-parse --short=8 HEAD`"
all: check build test install all: check build test install
@ -14,20 +14,20 @@ check: check_tools ensure_deps
### Build ### Build
build: build:
CGO_ENABLED=0 go build $(BUILD_FLAGS) -tags '$(BUILD_TAGS)' -o build/tendermint ./cmd/tendermint/ CGO_ENABLED=0 go build $(BUILD_FLAGS) -tags $(BUILD_TAGS) -o build/tendermint ./cmd/tendermint/
build_race: build_race:
CGO_ENABLED=0 go build -race $(BUILD_FLAGS) -tags '$(BUILD_TAGS)' -o build/tendermint ./cmd/tendermint CGO_ENABLED=0 go build -race $(BUILD_FLAGS) -tags $(BUILD_TAGS) -o build/tendermint ./cmd/tendermint
install: install:
CGO_ENABLED=0 go install $(BUILD_FLAGS) -tags '$(BUILD_TAGS)' ./cmd/tendermint CGO_ENABLED=0 go install $(BUILD_FLAGS) -tags $(BUILD_TAGS) ./cmd/tendermint
######################################## ########################################
### Distribution ### Distribution
# dist builds binaries for all platforms and packages them for distribution # dist builds binaries for all platforms and packages them for distribution
dist: dist:
@BUILD_TAGS='$(BUILD_TAGS)' sh -c "'$(CURDIR)/scripts/dist.sh'" @BUILD_TAGS=$(BUILD_TAGS) sh -c "'$(CURDIR)/scripts/dist.sh'"
######################################## ########################################
### Tools & dependencies ### Tools & dependencies
@ -66,7 +66,7 @@ draw_deps:
get_deps_bin_size: get_deps_bin_size:
@# Copy of build recipe with additional flags to perform binary size analysis @# Copy of build recipe with additional flags to perform binary size analysis
$(eval $(shell go build -work -a $(BUILD_FLAGS) -tags '$(BUILD_TAGS)' -o build/tendermint ./cmd/tendermint/ 2>&1)) $(eval $(shell go build -work -a $(BUILD_FLAGS) -tags $(BUILD_TAGS) -o build/tendermint ./cmd/tendermint/ 2>&1))
@find $(WORK) -type f -name "*.a" | xargs -I{} du -hxs "{}" | sort -rh | sed -e s:${WORK}/::g > deps_bin_size.log @find $(WORK) -type f -name "*.a" | xargs -I{} du -hxs "{}" | sort -rh | sed -e s:${WORK}/::g > deps_bin_size.log
@echo "Results can be found here: $(CURDIR)/deps_bin_size.log" @echo "Results can be found here: $(CURDIR)/deps_bin_size.log"
@ -132,7 +132,7 @@ vagrant_test:
### go tests ### go tests
test: test:
@echo "--> Running go test" @echo "--> Running go test"
@go test $(PACKAGES) @GOCACHE=off go test $(PACKAGES)
test_race: test_race:
@echo "--> Running go test --race" @echo "--> Running go test --race"

View File

@ -29,10 +29,10 @@ eg, L = latency = 0.1s
*/ */
const ( const (
requestIntervalMS = 100 requestIntervalMS = 2
maxTotalRequesters = 1000 maxTotalRequesters = 600
maxPendingRequests = maxTotalRequesters maxPendingRequests = maxTotalRequesters
maxPendingRequestsPerPeer = 50 maxPendingRequestsPerPeer = 20
// Minimum recv rate to ensure we're receiving blocks from a peer fast // Minimum recv rate to ensure we're receiving blocks from a peer fast
// enough. If a peer is not sending us data at at least that rate, we // enough. If a peer is not sending us data at at least that rate, we
@ -219,14 +219,12 @@ func (pool *BlockPool) RedoRequest(height int64) p2p.ID {
defer pool.mtx.Unlock() defer pool.mtx.Unlock()
request := pool.requesters[height] request := pool.requesters[height]
peerID := request.getPeerID()
if request.block == nil { if peerID != p2p.ID("") {
panic("Expected block to be non-nil") // RemovePeer will redo all requesters associated with this peer.
pool.removePeer(peerID)
} }
return peerID
// RemovePeer will redo all requesters associated with this peer.
pool.removePeer(request.peerID)
return request.peerID
} }
// TODO: ensure that blocks come in order for each peer. // TODO: ensure that blocks come in order for each peer.

View File

@ -17,7 +17,8 @@ const (
// BlockchainChannel is a channel for blocks and status updates (`BlockStore` height) // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
BlockchainChannel = byte(0x40) BlockchainChannel = byte(0x40)
trySyncIntervalMS = 50 trySyncIntervalMS = 10
// stop syncing when last block's time is // stop syncing when last block's time is
// within this much of the system time. // within this much of the system time.
// stopSyncingDurationMinutes = 10 // stopSyncingDurationMinutes = 10
@ -75,8 +76,9 @@ func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *Bl
store.Height())) store.Height()))
} }
const capacity = 1000 // must be bigger than peers count requestsCh := make(chan BlockRequest, maxTotalRequesters)
requestsCh := make(chan BlockRequest, capacity)
const capacity = 1000 // must be bigger than peers count
errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock errorsCh := make(chan peerError, capacity) // so we don't block in #Receive#pool.AddBlock
pool := NewBlockPool( pool := NewBlockPool(
@ -208,7 +210,6 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
// Handle messages from the poolReactor telling the reactor what to do. // Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
// (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
func (bcR *BlockchainReactor) poolRoutine() { func (bcR *BlockchainReactor) poolRoutine() {
trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
@ -223,6 +224,8 @@ func (bcR *BlockchainReactor) poolRoutine() {
lastHundred := time.Now() lastHundred := time.Now()
lastRate := 0.0 lastRate := 0.0
didProcessCh := make(chan struct{}, 1)
FOR_LOOP: FOR_LOOP:
for { for {
select { select {
@ -238,14 +241,17 @@ FOR_LOOP:
// The pool handles timeouts, just let it go. // The pool handles timeouts, just let it go.
continue FOR_LOOP continue FOR_LOOP
} }
case err := <-bcR.errorsCh: case err := <-bcR.errorsCh:
peer := bcR.Switch.Peers().Get(err.peerID) peer := bcR.Switch.Peers().Get(err.peerID)
if peer != nil { if peer != nil {
bcR.Switch.StopPeerForError(peer, err) bcR.Switch.StopPeerForError(peer, err)
} }
case <-statusUpdateTicker.C: case <-statusUpdateTicker.C:
// ask for status updates // ask for status updates
go bcR.BroadcastStatusRequest() // nolint: errcheck go bcR.BroadcastStatusRequest() // nolint: errcheck
case <-switchToConsensusTicker.C: case <-switchToConsensusTicker.C:
height, numPending, lenRequesters := bcR.pool.GetStatus() height, numPending, lenRequesters := bcR.pool.GetStatus()
outbound, inbound, _ := bcR.Switch.NumPeers() outbound, inbound, _ := bcR.Switch.NumPeers()
@ -260,60 +266,78 @@ FOR_LOOP:
break FOR_LOOP break FOR_LOOP
} }
case <-trySyncTicker.C: // chan time case <-trySyncTicker.C: // chan time
// This loop can be slow as long as it's doing syncing work. select {
SYNC_LOOP: case didProcessCh <- struct{}{}:
for i := 0; i < 10; i++ { default:
// See if there are any blocks to sync. }
first, second := bcR.pool.PeekTwoBlocks()
//bcR.Logger.Info("TrySync peeked", "first", first, "second", second) case <-didProcessCh:
if first == nil || second == nil { // NOTE: It is a subtle mistake to process more than a single block
// We need both to sync the first block. // at a time (e.g. 10) here, because we only TrySend 1 request per
break SYNC_LOOP // loop. The ratio mismatch can result in starving of blocks, a
// sudden burst of requests and responses, and repeat.
// Consequently, it is better to split these routines rather than
// coupling them as it's written here. TODO uncouple from request
// routine.
// See if there are any blocks to sync.
first, second := bcR.pool.PeekTwoBlocks()
//bcR.Logger.Info("TrySync peeked", "first", first, "second", second)
if first == nil || second == nil {
// We need both to sync the first block.
continue FOR_LOOP
} else {
// Try again quickly next loop.
didProcessCh <- struct{}{}
}
firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes)
firstPartsHeader := firstParts.Header()
firstID := types.BlockID{first.Hash(), firstPartsHeader}
// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary.
err := state.Validators.VerifyCommit(
chainID, firstID, first.Height, second.LastCommit)
if err != nil {
bcR.Logger.Error("Error in validation", "err", err)
peerID := bcR.pool.RedoRequest(first.Height)
peer := bcR.Switch.Peers().Get(peerID)
if peer != nil {
// NOTE: we've already removed the peer's request, but we
// still need to clean up the rest.
bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err))
} }
firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes) continue FOR_LOOP
firstPartsHeader := firstParts.Header() } else {
firstID := types.BlockID{first.Hash(), firstPartsHeader} bcR.pool.PopRequest()
// Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling // TODO: batch saves so we dont persist to disk every block
// first.Hash() doesn't verify the tx contents, so MakePartSet() is bcR.store.SaveBlock(first, firstParts, second.LastCommit)
// currently necessary.
err := state.Validators.VerifyCommit( // TODO: same thing for app - but we would need a way to
chainID, firstID, first.Height, second.LastCommit) // get the hash without persisting the state
var err error
state, err = bcR.blockExec.ApplyBlock(state, firstID, first)
if err != nil { if err != nil {
bcR.Logger.Error("Error in validation", "err", err) // TODO This is bad, are we zombie?
peerID := bcR.pool.RedoRequest(first.Height) cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v",
peer := bcR.Switch.Peers().Get(peerID) first.Height, first.Hash(), err))
if peer != nil { }
bcR.Switch.StopPeerForError(peer, fmt.Errorf("BlockchainReactor validation error: %v", err)) blocksSynced++
}
break SYNC_LOOP
} else {
bcR.pool.PopRequest()
// TODO: batch saves so we dont persist to disk every block if blocksSynced%100 == 0 {
bcR.store.SaveBlock(first, firstParts, second.LastCommit) lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
// TODO: same thing for app - but we would need a way to "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
// get the hash without persisting the state lastHundred = time.Now()
var err error
state, err = bcR.blockExec.ApplyBlock(state, firstID, first)
if err != nil {
// TODO This is bad, are we zombie?
cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v",
first.Height, first.Hash(), err))
}
blocksSynced++
if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
"max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
lastHundred = time.Now()
}
} }
} }
continue FOR_LOOP continue FOR_LOOP
case <-bcR.Quit(): case <-bcR.Quit():
break FOR_LOOP break FOR_LOOP
} }

View File

@ -183,7 +183,7 @@ func TestDifferByTimestamp(t *testing.T) {
assert.NoError(t, err, "expected no error signing proposal") assert.NoError(t, err, "expected no error signing proposal")
signBytes := proposal.SignBytes(chainID) signBytes := proposal.SignBytes(chainID)
sig := proposal.Signature sig := proposal.Signature
timeStamp := clipToMS(proposal.Timestamp) timeStamp := proposal.Timestamp
// manipulate the timestamp. should get changed back // manipulate the timestamp. should get changed back
proposal.Timestamp = proposal.Timestamp.Add(time.Millisecond) proposal.Timestamp = proposal.Timestamp.Add(time.Millisecond)
@ -207,7 +207,7 @@ func TestDifferByTimestamp(t *testing.T) {
signBytes := vote.SignBytes(chainID) signBytes := vote.SignBytes(chainID)
sig := vote.Signature sig := vote.Signature
timeStamp := clipToMS(vote.Timestamp) timeStamp := vote.Timestamp
// manipulate the timestamp. should get changed back // manipulate the timestamp. should get changed back
vote.Timestamp = vote.Timestamp.Add(time.Millisecond) vote.Timestamp = vote.Timestamp.Add(time.Millisecond)
@ -242,10 +242,3 @@ func newProposal(height int64, round int, partsHeader types.PartSetHeader) *type
Timestamp: time.Now().UTC(), Timestamp: time.Now().UTC(),
} }
} }
func clipToMS(t time.Time) time.Time {
nano := t.UnixNano()
million := int64(1000000)
nano = (nano / million) * million
return time.Unix(0, nano).UTC()
}

View File

@ -80,6 +80,7 @@ func loadState(db dbm.DB, key []byte) (state State) {
} }
// SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database. // SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database.
// This flushes the writes (e.g. calls SetSync).
func SaveState(db dbm.DB, state State) { func SaveState(db dbm.DB, state State) {
saveState(db, state, stateKey) saveState(db, state, stateKey)
} }
@ -148,7 +149,7 @@ func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) {
// This is useful in case we crash after app.Commit and before s.Save(). // This is useful in case we crash after app.Commit and before s.Save().
// Responses are indexed by height so they can also be loaded later to produce Merkle proofs. // Responses are indexed by height so they can also be loaded later to produce Merkle proofs.
func saveABCIResponses(db dbm.DB, height int64, abciResponses *ABCIResponses) { func saveABCIResponses(db dbm.DB, height int64, abciResponses *ABCIResponses) {
db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes()) db.Set(calcABCIResponsesKey(height), abciResponses.Bytes())
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -213,7 +214,7 @@ func saveValidatorsInfo(db dbm.DB, nextHeight, changeHeight int64, valSet *types
if changeHeight == nextHeight { if changeHeight == nextHeight {
valInfo.ValidatorSet = valSet valInfo.ValidatorSet = valSet
} }
db.SetSync(calcValidatorsKey(nextHeight), valInfo.Bytes()) db.Set(calcValidatorsKey(nextHeight), valInfo.Bytes())
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -278,5 +279,5 @@ func saveConsensusParamsInfo(db dbm.DB, nextHeight, changeHeight int64, params t
if changeHeight == nextHeight { if changeHeight == nextHeight {
paramsInfo.ConsensusParams = params paramsInfo.ConsensusParams = params
} }
db.SetSync(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes()) db.Set(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes())
} }

View File

@ -9,7 +9,7 @@ import (
// Canonical json is amino's json for structs with fields in alphabetical order // Canonical json is amino's json for structs with fields in alphabetical order
// TimeFormat is used for generating the sigs // TimeFormat is used for generating the sigs
const TimeFormat = "2006-01-02T15:04:05.000Z" const TimeFormat = time.RFC3339Nano
type CanonicalJSONBlockID struct { type CanonicalJSONBlockID struct {
Hash cmn.HexBytes `json:"hash,omitempty"` Hash cmn.HexBytes `json:"hash,omitempty"`