consolidate saveResults/SaveABCIResponses

This commit is contained in:
Ethan Buchman 2017-12-25 13:47:16 -05:00
parent d65234ed51
commit 73fb1c3a17
6 changed files with 120 additions and 154 deletions

View File

@ -42,7 +42,7 @@ type (
Height int64 Height int64
} }
ErrNoResultsForHeight struct { ErrNoABCIResponsesForHeight struct {
Height int64 Height int64
} }
) )
@ -74,6 +74,6 @@ func (e ErrNoConsensusParamsForHeight) Error() string {
return cmn.Fmt("Could not find consensus params for height #%d", e.Height) return cmn.Fmt("Could not find consensus params for height #%d", e.Height)
} }
func (e ErrNoResultsForHeight) Error() string { func (e ErrNoABCIResponsesForHeight) Error() string {
return cmn.Fmt("Could not find results for height #%d", e.Height) return cmn.Fmt("Could not find results for height #%d", e.Height)
} }

View File

@ -52,25 +52,25 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p
// TODO: make use of res.Log // TODO: make use of res.Log
// TODO: make use of this info // TODO: make use of this info
// Blocks may include invalid txs. // Blocks may include invalid txs.
// reqDeliverTx := req.(abci.RequestDeliverTx) txRes := r.DeliverTx
txResult := r.DeliverTx if txRes.Code == abci.CodeTypeOK {
if txResult.Code == abci.CodeTypeOK {
validTxs++ validTxs++
} else { } else {
logger.Debug("Invalid tx", "code", txResult.Code, "log", txResult.Log) logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
invalidTxs++ invalidTxs++
} }
// NOTE: if we count we can access the tx from the block instead of // NOTE: if we count we can access the tx from the block instead of
// pulling it from the req // pulling it from the req
tx := types.Tx(req.GetDeliverTx().Tx)
txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{ txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{
Height: block.Height, Height: block.Height,
Index: uint32(txIndex), Index: uint32(txIndex),
Tx: types.Tx(req.GetDeliverTx().Tx), Tx: tx,
Result: *txResult, Result: *txRes,
}}) }})
abciResponses.DeliverTx[txIndex] = txResult abciResponses.DeliverTx[txIndex] = txRes
txIndex++ txIndex++
} }
} }
@ -84,6 +84,8 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p
} }
} }
// TODO: determine which validators were byzantine
// Begin block // Begin block
_, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{ _, err := proxyAppConn.BeginBlockSync(abci.RequestBeginBlock{
Hash: block.Hash(), Hash: block.Hash(),
@ -322,7 +324,7 @@ func (s *State) ApplyBlock(txEventPublisher types.TxEventPublisher, proxyAppConn
fail.Fail() // XXX fail.Fail() // XXX
// save the results before we commit // save the results before we commit
s.SaveABCIResponses(abciResponses) s.SaveABCIResponses(block.Height, abciResponses)
fail.Fail() // XXX fail.Fail() // XXX

View File

@ -20,8 +20,7 @@ import (
// database keys // database keys
var ( var (
stateKey = []byte("stateKey") stateKey = []byte("stateKey")
abciResponsesKey = []byte("abciResponsesKey")
) )
func calcValidatorsKey(height int64) []byte { func calcValidatorsKey(height int64) []byte {
@ -32,8 +31,8 @@ func calcConsensusParamsKey(height int64) []byte {
return []byte(cmn.Fmt("consensusParamsKey:%v", height)) return []byte(cmn.Fmt("consensusParamsKey:%v", height))
} }
func calcResultsKey(height int64) []byte { func calcABCIResponsesKey(height int64) []byte {
return []byte(cmn.Fmt("resultsKey:%v", height)) return []byte(cmn.Fmt("abciResponsesKey:%v", height))
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
@ -75,10 +74,8 @@ type State struct {
LastConsensusParams types.ConsensusParams LastConsensusParams types.ConsensusParams
LastHeightConsensusParamsChanged int64 LastHeightConsensusParamsChanged int64
// Store LastABCIResults along with hash // Merkle root of the results from executing prev block
LastResults types.ABCIResults // TODO: remove?? LastResultHash []byte
LastResultHash []byte // this is the one for the next block to propose
LastLastResultHash []byte // this verifies the last block?
// The latest AppHash we've received from calling abci.Commit() // The latest AppHash we've received from calling abci.Commit()
AppHash []byte AppHash []byte
@ -154,7 +151,6 @@ func (s *State) Copy() *State {
AppHash: s.AppHash, AppHash: s.AppHash,
LastResults: s.LastResults,
LastResultHash: s.LastResultHash, LastResultHash: s.LastResultHash,
logger: s.logger, logger: s.logger,
@ -168,23 +164,23 @@ func (s *State) Save() {
s.saveValidatorsInfo() s.saveValidatorsInfo()
s.saveConsensusParamsInfo() s.saveConsensusParamsInfo()
s.saveResults()
s.db.SetSync(stateKey, s.Bytes()) s.db.SetSync(stateKey, s.Bytes())
} }
// SaveABCIResponses persists the ABCIResponses to the database. // SaveABCIResponses persists the ABCIResponses to the database.
// This is useful in case we crash after app.Commit and before s.Save(). // This is useful in case we crash after app.Commit and before s.Save().
func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) { // Responses are indexed by height so they can also be loaded later to produce Merkle proofs.
s.db.SetSync(abciResponsesKey, abciResponses.Bytes()) func (s *State) SaveABCIResponses(height int64, abciResponses *ABCIResponses) {
s.db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes())
} }
// LoadABCIResponses loads the ABCIResponses from the database. // LoadABCIResponses loads the ABCIResponses for the given height from the database.
// This is useful for recovering from crashes where we called app.Commit and before we called // This is useful for recovering from crashes where we called app.Commit and before we called
// s.Save() // s.Save(). It can also be used to produce Merkle proofs of the result of txs.
func (s *State) LoadABCIResponses() *ABCIResponses { func (s *State) LoadABCIResponses(height int64) (*ABCIResponses, error) {
buf := s.db.Get(abciResponsesKey) buf := s.db.Get(calcABCIResponsesKey(height))
if len(buf) == 0 { if len(buf) == 0 {
return nil return nil, ErrNoABCIResponsesForHeight{height}
} }
abciResponses := new(ABCIResponses) abciResponses := new(ABCIResponses)
@ -197,7 +193,7 @@ func (s *State) LoadABCIResponses() *ABCIResponses {
} }
// TODO: ensure that buf is completely read. // TODO: ensure that buf is completely read.
return abciResponses return abciResponses, nil
} }
// LoadValidators loads the ValidatorSet for a given height. // LoadValidators loads the ValidatorSet for a given height.
@ -308,39 +304,6 @@ func (s *State) saveConsensusParamsInfo() {
s.db.SetSync(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes()) s.db.SetSync(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes())
} }
// LoadResults loads the types.ABCIResults for a given height.
func (s *State) LoadResults(height int64) (types.ABCIResults, error) {
resInfo := s.loadResults(height)
if resInfo == nil {
return nil, ErrNoResultsForHeight{height}
}
return resInfo, nil
}
func (s *State) loadResults(height int64) types.ABCIResults {
buf := s.db.Get(calcResultsKey(height))
if len(buf) == 0 {
return nil
}
v := new(types.ABCIResults)
err := wire.ReadBinaryBytes(buf, v)
if err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt(`LoadResults: Data has been corrupted or its spec has changed:
%v\n`, err))
}
return *v
}
// saveResults persists the results for the last block to disk.
// It should be called from s.Save(), right before the state itself is persisted.
func (s *State) saveResults() {
nextHeight := s.LastBlockHeight + 1
results := s.LastResults
s.db.SetSync(calcResultsKey(nextHeight), results.Bytes())
}
// Equals returns true if the States are identical. // Equals returns true if the States are identical.
func (s *State) Equals(s2 *State) bool { func (s *State) Equals(s2 *State) bool {
return bytes.Equal(s.Bytes(), s2.Bytes()) return bytes.Equal(s.Bytes(), s2.Bytes())
@ -393,7 +356,7 @@ func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader typ
header.Time, header.Time,
nextValSet, nextValSet,
nextParams, nextParams,
types.NewResults(abciResponses.DeliverTx)) abciResponses.ResultsHash())
return nil return nil
} }
@ -401,7 +364,7 @@ func (s *State) setBlockAndValidators(height int64,
newTxs int64, blockID types.BlockID, blockTime time.Time, newTxs int64, blockID types.BlockID, blockTime time.Time,
valSet *types.ValidatorSet, valSet *types.ValidatorSet,
params types.ConsensusParams, params types.ConsensusParams,
results types.ABCIResults) { resultsHash []byte) {
s.LastBlockHeight = height s.LastBlockHeight = height
s.LastBlockTotalTx += newTxs s.LastBlockTotalTx += newTxs
@ -414,8 +377,7 @@ func (s *State) setBlockAndValidators(height int64,
s.LastConsensusParams = s.ConsensusParams s.LastConsensusParams = s.ConsensusParams
s.ConsensusParams = params s.ConsensusParams = params
s.LastResults = results s.LastResultHash = resultsHash
s.LastResultHash = results.Hash()
} }
// GetValidators returns the last and current validator sets. // GetValidators returns the last and current validator sets.
@ -425,23 +387,18 @@ func (s *State) GetValidators() (last *types.ValidatorSet, current *types.Valida
//------------------------------------------------------------------------ //------------------------------------------------------------------------
// ABCIResponses retains the responses of the various ABCI calls during block processing. // ABCIResponses retains the deterministic components of the responses
// of the various ABCI calls during block processing.
// It is persisted to disk before calling Commit. // It is persisted to disk before calling Commit.
type ABCIResponses struct { type ABCIResponses struct {
Height int64
DeliverTx []*abci.ResponseDeliverTx DeliverTx []*abci.ResponseDeliverTx
EndBlock *abci.ResponseEndBlock EndBlock *abci.ResponseEndBlock
txs types.Txs // reference for indexing results by hash
} }
// NewABCIResponses returns a new ABCIResponses // NewABCIResponses returns a new ABCIResponses
func NewABCIResponses(block *types.Block) *ABCIResponses { func NewABCIResponses(block *types.Block) *ABCIResponses {
return &ABCIResponses{ return &ABCIResponses{
Height: block.Height,
DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs), DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs),
txs: block.Data.Txs,
} }
} }
@ -450,6 +407,11 @@ func (a *ABCIResponses) Bytes() []byte {
return wire.BinaryBytes(*a) return wire.BinaryBytes(*a)
} }
func (a *ABCIResponses) ResultsHash() []byte {
results := types.NewResults(a.DeliverTx)
return results.Hash()
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// ValidatorsInfo represents the latest validator set, or the last height it changed // ValidatorsInfo represents the latest validator set, or the last height it changed

View File

@ -68,7 +68,7 @@ func TestStateSaveLoad(t *testing.T) {
} }
// TestABCIResponsesSaveLoad tests saving and loading ABCIResponses. // TestABCIResponsesSaveLoad tests saving and loading ABCIResponses.
func TestABCIResponsesSaveLoad(t *testing.T) { func TestABCIResponsesSaveLoad1(t *testing.T) {
tearDown, _, state := setupTestCase(t) tearDown, _, state := setupTestCase(t)
defer tearDown(t) defer tearDown(t)
// nolint: vetshadow // nolint: vetshadow
@ -87,15 +87,84 @@ func TestABCIResponsesSaveLoad(t *testing.T) {
Power: 10, Power: 10,
}, },
}} }}
abciResponses.txs = nil
state.SaveABCIResponses(abciResponses) state.SaveABCIResponses(block.Height, abciResponses)
loadedAbciResponses := state.LoadABCIResponses() loadedAbciResponses, err := state.LoadABCIResponses(block.Height)
assert.Nil(err)
assert.Equal(abciResponses, loadedAbciResponses, assert.Equal(abciResponses, loadedAbciResponses,
cmn.Fmt(`ABCIResponses don't match: Got %v, Expected %v`, loadedAbciResponses, cmn.Fmt(`ABCIResponses don't match: Got %v, Expected %v`, loadedAbciResponses,
abciResponses)) abciResponses))
} }
// TestResultsSaveLoad tests saving and loading abci results.
func TestABCIResponsesSaveLoad2(t *testing.T) {
tearDown, _, state := setupTestCase(t)
defer tearDown(t)
// nolint: vetshadow
assert := assert.New(t)
cases := [...]struct {
// height is implied index+2
// as block 1 is created from genesis
added []*abci.ResponseDeliverTx
expected types.ABCIResults
}{
0: {
[]*abci.ResponseDeliverTx{},
types.ABCIResults{},
},
1: {
[]*abci.ResponseDeliverTx{
{Code: 32, Data: []byte("Hello"), Log: "Huh?"},
},
types.ABCIResults{
{32, []byte("Hello")},
}},
2: {
[]*abci.ResponseDeliverTx{
{Code: 383},
{Data: []byte("Gotcha!"),
Tags: []*abci.KVPair{
abci.KVPairInt("a", 1),
abci.KVPairString("build", "stuff"),
}},
},
types.ABCIResults{
{383, []byte{}},
{0, []byte("Gotcha!")},
}},
3: {
nil,
types.ABCIResults{},
},
}
// query all before, should return error
for i := range cases {
h := int64(i + 1)
res, err := state.LoadABCIResponses(h)
assert.Error(err, "%d: %#v", i, res)
}
// add all cases
for i, tc := range cases {
h := int64(i + 1) // last block height, one below what we save
responses := &ABCIResponses{
DeliverTx: tc.added,
EndBlock: &abci.ResponseEndBlock{},
}
state.SaveABCIResponses(h, responses)
}
// query all before, should return expected value
for i, tc := range cases {
h := int64(i + 1)
res, err := state.LoadABCIResponses(h)
assert.NoError(err, "%d", i)
assert.Equal(tc.expected.Hash(), res.ResultsHash(), "%d", i)
}
}
// TestValidatorSimpleSaveLoad tests saving and loading validators. // TestValidatorSimpleSaveLoad tests saving and loading validators.
func TestValidatorSimpleSaveLoad(t *testing.T) { func TestValidatorSimpleSaveLoad(t *testing.T) {
tearDown, _, state := setupTestCase(t) tearDown, _, state := setupTestCase(t)
@ -279,73 +348,6 @@ func TestConsensusParamsChangesSaveLoad(t *testing.T) {
} }
} }
// TestResultsSaveLoad tests saving and loading abci results.
func TestResultsSaveLoad(t *testing.T) {
tearDown, _, state := setupTestCase(t)
defer tearDown(t)
// nolint: vetshadow
assert := assert.New(t)
cases := [...]struct {
// height is implied index+2
// as block 1 is created from genesis
added []*abci.ResponseDeliverTx
expected types.ABCIResults
}{
0: {
[]*abci.ResponseDeliverTx{},
types.ABCIResults{},
},
1: {
[]*abci.ResponseDeliverTx{
{Code: 32, Data: []byte("Hello"), Log: "Huh?"},
},
types.ABCIResults{
{32, []byte("Hello")},
}},
2: {
[]*abci.ResponseDeliverTx{
{Code: 383},
{Data: []byte("Gotcha!"),
Tags: []*abci.KVPair{
abci.KVPairInt("a", 1),
abci.KVPairString("build", "stuff"),
}},
},
types.ABCIResults{
{383, []byte{}},
{0, []byte("Gotcha!")},
}},
3: {
nil,
types.ABCIResults{},
},
}
// query all before, should return error
for i := range cases {
h := int64(i + 2)
res, err := state.LoadResults(h)
assert.Error(err, "%d: %#v", i, res)
}
// add all cases
for i, tc := range cases {
h := int64(i + 1) // last block height, one below what we save
header, parts, responses := makeHeaderPartsResults(state, h, tc.added)
state.SetBlockAndValidators(header, parts, responses)
state.Save()
}
// query all before, should return expected value
for i, tc := range cases {
h := int64(i + 2)
res, err := state.LoadResults(h)
assert.NoError(err, "%d", i)
assert.Equal(tc.expected, res, "%d", i)
}
}
func makeParams(blockBytes, blockTx, blockGas, txBytes, func makeParams(blockBytes, blockTx, blockGas, txBytes,
txGas, partSize int) types.ConsensusParams { txGas, partSize int) types.ConsensusParams {
@ -488,7 +490,6 @@ func makeHeaderPartsResponsesValPubKeyChange(state *State, height int64,
block := makeBlock(state, height) block := makeBlock(state, height)
abciResponses := &ABCIResponses{ abciResponses := &ABCIResponses{
Height: height,
EndBlock: &abci.ResponseEndBlock{ValidatorUpdates: []*abci.Validator{}}, EndBlock: &abci.ResponseEndBlock{ValidatorUpdates: []*abci.Validator{}},
} }
@ -533,7 +534,6 @@ func makeHeaderPartsResponsesParams(state *State, height int64,
block := makeBlock(state, height) block := makeBlock(state, height)
abciResponses := &ABCIResponses{ abciResponses := &ABCIResponses{
Height: height,
EndBlock: &abci.ResponseEndBlock{ConsensusParamUpdates: types.TM2PB.ConsensusParams(&params)}, EndBlock: &abci.ResponseEndBlock{ConsensusParamUpdates: types.TM2PB.ConsensusParams(&params)},
} }
return block.Header, types.PartSetHeader{}, abciResponses return block.Header, types.PartSetHeader{}, abciResponses
@ -549,7 +549,6 @@ func makeHeaderPartsResults(state *State, height int64,
block := makeBlock(state, height) block := makeBlock(state, height)
abciResponses := &ABCIResponses{ abciResponses := &ABCIResponses{
Height: height,
DeliverTx: results, DeliverTx: results,
EndBlock: &abci.ResponseEndBlock{}, EndBlock: &abci.ResponseEndBlock{},
} }

View File

@ -149,7 +149,7 @@ type Header struct {
LastCommitHash data.Bytes `json:"last_commit_hash"` // commit from validators from the last block LastCommitHash data.Bytes `json:"last_commit_hash"` // commit from validators from the last block
DataHash data.Bytes `json:"data_hash"` // transactions DataHash data.Bytes `json:"data_hash"` // transactions
// hashes from the app // hashes from the app output from the prev block
ValidatorsHash data.Bytes `json:"validators_hash"` // validators for the current block ValidatorsHash data.Bytes `json:"validators_hash"` // validators for the current block
ConsensusHash data.Bytes `json:"consensus_hash"` // consensus params for current block ConsensusHash data.Bytes `json:"consensus_hash"` // consensus params for current block
AppHash data.Bytes `json:"app_hash"` // state after txs from the previous block AppHash data.Bytes `json:"app_hash"` // state after txs from the previous block

View File

@ -13,14 +13,13 @@ import (
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// ABCIResult is just the essential info to prove // ABCIResult is the deterministic component of a ResponseDeliverTx.
// success/failure of a DeliverTx
type ABCIResult struct { type ABCIResult struct {
Code uint32 `json:"code"` Code uint32 `json:"code"`
Data data.Bytes `json:"data"` Data data.Bytes `json:"data"`
} }
// Hash creates a canonical json hash of the ABCIResult // Hash returns the canonical json hash of the ABCIResult
func (a ABCIResult) Hash() []byte { func (a ABCIResult) Hash() []byte {
// stupid canonical json output, easy to check in any language // stupid canonical json output, easy to check in any language
bs := fmt.Sprintf(`{"code":%d,"data":"%s"}`, a.Code, a.Data) bs := fmt.Sprintf(`{"code":%d,"data":"%s"}`, a.Code, a.Data)
@ -36,14 +35,18 @@ type ABCIResults []ABCIResult
func NewResults(del []*abci.ResponseDeliverTx) ABCIResults { func NewResults(del []*abci.ResponseDeliverTx) ABCIResults {
res := make(ABCIResults, len(del)) res := make(ABCIResults, len(del))
for i, d := range del { for i, d := range del {
res[i] = ABCIResult{ res[i] = NewResultFromResponse(d)
Code: d.Code,
Data: d.Data,
}
} }
return res return res
} }
func NewResultFromResponse(response *abci.ResponseDeliverTx) ABCIResult {
return ABCIResult{
Code: response.Code,
Data: response.Data,
}
}
// Bytes serializes the ABCIResponse using go-wire // Bytes serializes the ABCIResponse using go-wire
func (a ABCIResults) Bytes() []byte { func (a ABCIResults) Bytes() []byte {
return wire.BinaryBytes(a) return wire.BinaryBytes(a)