From 8718bd52a42e5bca50f4f8525fe755a4c8940638 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 5 Nov 2014 03:08:29 -0800 Subject: [PATCH] made ValidatorSet.Hash deterministic; fix off-by-1 bugs --- blocks/part_set.go | 10 +-- blocks/signature.go | 7 +- blocks/store.go | 2 + blocks/vote.go | 2 +- consensus/reactor.go | 2 +- consensus/state.go | 11 ++- mempool/mempool.go | 4 +- merkle/simple_tree.go | 33 +++---- p2p/switch.go | 6 +- state/state.go | 5 +- state/state_test.go | 7 +- state/validator.go | 22 +++-- state/validator_set.go | 196 +++++++++++++++++++++++++++++------------ 13 files changed, 207 insertions(+), 100 deletions(-) diff --git a/blocks/part_set.go b/blocks/part_set.go index fd39a24e..7254a38f 100644 --- a/blocks/part_set.go +++ b/blocks/part_set.go @@ -102,7 +102,7 @@ func (psh PartSetHeader) WriteTo(w io.Writer) (n int64, err error) { } func (psh PartSetHeader) String() string { - return fmt.Sprintf("PartSet{%X/%v}", psh.Hash, psh.Total) + return fmt.Sprintf("PartSet{T:%v %X}", psh.Total, Fingerprint(psh.Hash)) } func (psh PartSetHeader) IsZero() bool { @@ -143,13 +143,13 @@ func NewPartSetFromData(data []byte) *PartSet { partsBitArray.SetIndex(uint(i), true) } // Compute merkle trails - hashTree := merkle.HashTreeFromHashables(parts_) + trails, rootTrail := merkle.HashTrailsFromHashables(parts_) for i := 0; i < total; i++ { - parts[i].Trail = merkle.HashTrailForIndex(hashTree, i) + parts[i].Trail = trails[i].Flatten() } return &PartSet{ total: uint16(total), - hash: hashTree[len(hashTree)/2], + hash: rootTrail.Hash, parts: parts, partsBitArray: partsBitArray, count: uint16(total), @@ -236,7 +236,7 @@ func (ps *PartSet) AddPart(part *Part) (bool, error) { } // Check hash trail - if !merkle.VerifyHashTrailForIndex(int(part.Index), part.Hash(), part.Trail, ps.hash) { + if !merkle.VerifyHashTrail(uint(part.Index), uint(ps.total), part.Hash(), part.Trail, ps.hash) { return false, ErrPartSetInvalidTrail } diff --git a/blocks/signature.go b/blocks/signature.go index 4e7d02e6..6f90ffd4 100644 --- a/blocks/signature.go +++ b/blocks/signature.go @@ -5,6 +5,7 @@ import ( "io" . "github.com/tendermint/tendermint/binary" + . "github.com/tendermint/tendermint/common" ) type Signable interface { @@ -38,7 +39,7 @@ func (sig Signature) WriteTo(w io.Writer) (n int64, err error) { } func (sig Signature) String() string { - return fmt.Sprintf("Signature{%v:%X}", sig.SignerId, sig.Bytes) + return fmt.Sprintf("Signature{Id:%v %X}", sig.SignerId, Fingerprint(sig.Bytes)) } //------------------------------------- @@ -85,6 +86,10 @@ func (rsig RoundSignature) IsZero() bool { return rsig.Round == 0 && rsig.SignerId == 0 && len(rsig.Bytes) == 0 } +func (rsig RoundSignature) String() string { + return fmt.Sprintf("RoundSignature{R:%v Id:%v %X}", rsig.Round, rsig.SignerId, Fingerprint(rsig.Bytes)) +} + //------------------------------------- func ReadRoundSignatures(r io.Reader, n *int64, err *error) (rsigs []RoundSignature) { diff --git a/blocks/store.go b/blocks/store.go index 06062393..67bba864 100644 --- a/blocks/store.go +++ b/blocks/store.go @@ -116,6 +116,8 @@ func (bs *BlockStore) SaveBlock(block *Block, blockParts *PartSet) { bs.db.Set(calcBlockValidationKey(height), validationBytes) // Save new BlockStoreJSON descriptor BlockStoreJSON{Height: height}.Save(bs.db) + // Done! + bs.height = height } func (bs *BlockStore) saveBlockPart(height uint32, index uint16, part *Part) { diff --git a/blocks/vote.go b/blocks/vote.go index 07adece5..c44ffd27 100644 --- a/blocks/vote.go +++ b/blocks/vote.go @@ -80,5 +80,5 @@ func (v *Vote) String() string { panic("Unknown vote type") } - return fmt.Sprintf("%v{%v/%v:%X:%v:%v}", typeString, v.Height, v.Round, Fingerprint(v.BlockHash), v.BlockParts, v.SignerId) + return fmt.Sprintf("%v{%v/%v %X#%v %v}", typeString, v.Height, v.Round, Fingerprint(v.BlockHash), v.BlockParts, v.Signature) } diff --git a/consensus/reactor.go b/consensus/reactor.go index 7076bce3..e009036e 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -744,7 +744,7 @@ func (m *PartMessage) WriteTo(w io.Writer) (n int64, err error) { } func (m *PartMessage) String() string { - return fmt.Sprintf("[Part %v/%v T:%X]", m.Height, m.Round, m.Type) + return fmt.Sprintf("[Part %v/%v T:%X %v]", m.Height, m.Round, m.Type, m.Part) } //------------------------------------- diff --git a/consensus/state.go b/consensus/state.go index d8dfd6d9..d798487a 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -529,7 +529,7 @@ func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) { validation = cs.LastCommits.MakeValidation() } } - txs, state := cs.mempool.GetProposalTxs() // TODO: cache state + txs := cs.mempool.GetProposalTxs() block = &Block{ Header: &Header{ Network: Config.Network, @@ -538,13 +538,18 @@ func (cs *ConsensusState) RunActionPropose(height uint32, round uint16) { Fees: 0, // TODO fees LastBlockHash: cs.state.LastBlockHash, LastBlockParts: cs.state.LastBlockParts, - StateHash: state.Hash(), + StateHash: nil, // Will set afterwards. }, Validation: validation, Data: &Data{ Txs: txs, }, } + + // Set the block.Header.StateHash. + // TODO: we could cache the resulting state to cs.stagedState. + cs.state.Copy().AppendBlock(block, PartSetHeader{}, false) + blockParts = NewPartSetFromData(BinaryBytes(block)) pol = cs.LockedPOL // If exists, is a PoUnlock. } @@ -586,6 +591,7 @@ func (cs *ConsensusState) RunActionPrevote(height uint32, round uint16) { // If ProposalBlock is nil, prevote nil. if cs.ProposalBlock == nil { + log.Warning("ProposalBlock is nil") cs.signAddVote(VoteTypePrevote, nil, PartSetHeader{}) return } @@ -594,6 +600,7 @@ func (cs *ConsensusState) RunActionPrevote(height uint32, round uint16) { err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts) if err != nil { // ProposalBlock is invalid, prevote nil. + log.Warning("ProposalBlock is invalid: %v", err) cs.signAddVote(VoteTypePrevote, nil, PartSetHeader{}) return } diff --git a/mempool/mempool.go b/mempool/mempool.go index 37a3685b..284f0458 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -41,10 +41,10 @@ func (mem *Mempool) AddTx(tx Tx) (err error) { } } -func (mem *Mempool) GetProposalTxs() ([]Tx, *state.State) { +func (mem *Mempool) GetProposalTxs() []Tx { mem.mtx.Lock() defer mem.mtx.Unlock() - return mem.txs, mem.state + return mem.txs } // "block" is the new block being committed. diff --git a/merkle/simple_tree.go b/merkle/simple_tree.go index 8c6491f1..b3a6331c 100644 --- a/merkle/simple_tree.go +++ b/merkle/simple_tree.go @@ -1,23 +1,24 @@ /* Computes a deterministic minimal height merkle tree hash. If the number of items is not a power of two, some leaves -will be at different levels, leaning left. +will be at different levels. Tries to keep both sides of +the tree the same size, but the left may be one greater. -Use this for shorter deterministic trees, such as the validator list. +Use this for short deterministic trees, such as the validator list. For larger datasets, use IAVLTree. - * - / \ - / \ - / \ - / \ - * * - / \ / \ - / \ / \ - / \ / \ - * * * h6 - / \ / \ / \ - h0 h1 h2 h3 h4 h5 + * + / \ + / \ + / \ + / \ + * * + / \ / \ + / \ / \ + / \ / \ + * * * h6 + / \ / \ / \ + h0 h1 h2 h3 h4 h5 */ @@ -46,7 +47,7 @@ func HashFromHashes(hashes [][]byte) []byte { // Recursive impl. switch len(hashes) { case 0: - panic("Cannot call HashFromHashes() with 0 length arg") + return nil case 1: return hashes[0] default: @@ -110,7 +111,7 @@ func HashTrailsFromHashables(items []Hashable) (trails []*HashTrail, root *HashT // Recursive impl. switch len(items) { case 0: - panic("Cannot call HashTrailsFromHashables() with 0 length arg") + return nil, nil case 1: trail := &HashTrail{items[0].Hash(), nil, nil, nil} return []*HashTrail{trail}, trail diff --git a/p2p/switch.go b/p2p/switch.go index 6242a89e..104f6921 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -172,8 +172,10 @@ func (sw *Switch) Broadcast(chId byte, msg Binary) (numSuccess, numFailure int) log.Debug("[%X] Broadcast: %v", chId, msg) for _, peer := range sw.peers.List() { - success := peer.TrySend(chId, msg) - // log.Debug("Broadcast for peer %v success: %v", peer, success) + // XXX XXX Change. + // success := peer.TrySend(chId, msg) + success := peer.Send(chId, msg) + log.Debug("[%X] for peer %v success: %v", chId, peer, success) if success { numSuccess += 1 } else { diff --git a/state/state.go b/state/state.go index a882f10e..e8feebd3 100644 --- a/state/state.go +++ b/state/state.go @@ -317,7 +317,7 @@ func (s *State) AppendBlock(block *Block, blockPartsHeader PartSetHeader, checkS return true } vote := &Vote{ - Height: block.Height, + Height: block.Height - 1, Round: rsig.Round, Type: VoteTypeCommit, BlockHash: block.LastBlockHash, @@ -328,6 +328,7 @@ func (s *State) AppendBlock(block *Block, blockPartsHeader PartSetHeader, checkS sumVotingPower += val.VotingPower return false } else { + log.Warning("Invalid validation signature.\nval: %v\nvote: %v", val, vote) err = errors.New("Invalid validation signature") return true } @@ -355,7 +356,7 @@ func (s *State) AppendBlock(block *Block, blockPartsHeader PartSetHeader, checkS if val == nil { return ErrStateInvalidSignature } - val.LastCommitHeight = block.Height + val.LastCommitHeight = block.Height - 1 updated := s.BondedValidators.Update(val) if !updated { panic("Failed to update validator LastCommitHeight") diff --git a/state/state_test.go b/state/state_test.go index 85d4bdc5..56065c25 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -145,11 +145,8 @@ func TestGenesisSaveLoad(t *testing.T) { if s0.BondedValidators.TotalVotingPower() != s1.BondedValidators.TotalVotingPower() { t.Error("BondedValidators TotalVotingPower mismatch") } - if bytes.Equal(s0.BondedValidators.Hash(), s1.BondedValidators.Hash()) { - // The BondedValidators hash should have changed because - // each AppendBlock() calls IncrementAccum(), - // changing each validator's Accum. - t.Error("BondedValidators hash should have changed") + if !bytes.Equal(s0.BondedValidators.Hash(), s1.BondedValidators.Hash()) { + t.Error("BondedValidators hash mismatch") } if s0.UnbondingValidators.Size() != s1.UnbondingValidators.Size() { t.Error("UnbondingValidators Size mismatch") diff --git a/state/validator.go b/state/validator.go index 999060d2..c49aa097 100644 --- a/state/validator.go +++ b/state/validator.go @@ -33,14 +33,8 @@ func ReadValidator(r io.Reader, n *int64, err *error) *Validator { // Creates a new copy of the validator so we can mutate accum. func (v *Validator) Copy() *Validator { - return &Validator{ - Account: v.Account, - BondHeight: v.BondHeight, - UnbondHeight: v.UnbondHeight, - LastCommitHeight: v.LastCommitHeight, - VotingPower: v.VotingPower, - Accum: v.Accum, - } + vCopy := *v + return &vCopy } // Used to persist the state of ConsensusStateControl. @@ -75,7 +69,17 @@ func (v *Validator) CompareAccum(other *Validator) *Validator { } func (v *Validator) String() string { - return fmt.Sprintf("Validator{%v VP:%v A:%v}", v.Account, v.VotingPower, v.Accum) + return fmt.Sprintf("Validator{%v %v-%v-%v VP:%v A:%v}", + v.Account, + v.BondHeight, + v.LastCommitHeight, + v.UnbondHeight, + v.VotingPower, + v.Accum) +} + +func (v *Validator) Hash() []byte { + return BinaryHash(v) } //------------------------------------- diff --git a/state/validator_set.go b/state/validator_set.go index 5fa62123..9bb1ad94 100644 --- a/state/validator_set.go +++ b/state/validator_set.go @@ -3,50 +3,71 @@ package state import ( "fmt" "io" + "sort" "strings" . "github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/merkle" ) +//------------------------------------- +// Implements sort for sorting validators by id. + +type ValidatorSlice []*Validator + +func (vs ValidatorSlice) Len() int { + return len(vs) +} + +func (vs ValidatorSlice) Less(i, j int) bool { + return vs[i].Id < vs[j].Id +} + +func (vs ValidatorSlice) Swap(i, j int) { + it := vs[i] + vs[i] = vs[j] + vs[j] = it +} + +//------------------------------------- + // Not goroutine-safe. // TODO: consider validator Accum overflow? +// TODO: replace validators []*Validator with github.com/jaekwon/go-ibbs? +// NOTE: all get/set to validators should copy the value for safety. type ValidatorSet struct { - validators merkle.Tree - proposer *Validator // Whoever has the highest Accum. + validators []*Validator + + // cache + proposer *Validator totalVotingPower uint64 } func NewValidatorSet(vals []*Validator) *ValidatorSet { - validators := merkle.NewIAVLTree(BasicCodec, ValidatorCodec, 0, nil) // In memory - var proposer *Validator - totalVotingPower := uint64(0) - for _, val := range vals { - validators.Set(val.Id, val) - proposer = proposer.CompareAccum(val) - totalVotingPower += val.VotingPower + validators := make([]*Validator, len(vals)) + for i, val := range vals { + validators[i] = val.Copy() } + sort.Sort(ValidatorSlice(validators)) return &ValidatorSet{ - validators: validators, - proposer: proposer, - totalVotingPower: totalVotingPower, + validators: validators, } } func ReadValidatorSet(r io.Reader, n *int64, err *error) *ValidatorSet { - size := ReadUInt64(r, n, err) + size := ReadUVarInt(r, n, err) validators := []*Validator{} - for i := uint64(0); i < size; i++ { + for i := uint(0); i < size; i++ { validator := ReadValidator(r, n, err) validators = append(validators, validator) } + sort.Sort(ValidatorSlice(validators)) return NewValidatorSet(validators) } func (vset *ValidatorSet) WriteTo(w io.Writer) (n int64, err error) { - WriteUInt64(w, uint64(vset.validators.Size()), &n, &err) - vset.validators.Iterate(func(key_ interface{}, val_ interface{}) bool { - val := val_.(*Validator) + WriteUVarInt(w, uint(len(vset.validators)), &n, &err) + vset.Iterate(func(index uint, val *Validator) bool { WriteBinary(w, val, &n, &err) return false }) @@ -55,85 +76,152 @@ func (vset *ValidatorSet) WriteTo(w io.Writer) (n int64, err error) { func (vset *ValidatorSet) IncrementAccum() { // Decrement from previous proposer - vset.proposer.Accum -= int64(vset.totalVotingPower) - var proposer *Validator - // Increment accum and find proposer - vset.validators.Iterate(func(key_ interface{}, val_ interface{}) bool { - val := val_.(*Validator) + oldProposer := vset.Proposer() + oldProposer.Accum -= int64(vset.TotalVotingPower()) + vset.Update(oldProposer) + var newProposer *Validator + // Increment accum and find new proposer + // NOTE: updates validators in place. + for _, val := range vset.validators { val.Accum += int64(val.VotingPower) - proposer = proposer.CompareAccum(val) - return false - }) - vset.proposer = proposer + newProposer = newProposer.CompareAccum(val) + } + vset.proposer = newProposer } func (vset *ValidatorSet) Copy() *ValidatorSet { + validators := make([]*Validator, len(vset.validators)) + for i, val := range vset.validators { + // NOTE: must copy, since IncrementAccum updates in place. + validators[i] = val.Copy() + } return &ValidatorSet{ - validators: vset.validators.Copy(), + validators: validators, proposer: vset.proposer, totalVotingPower: vset.totalVotingPower, } } -func (vset *ValidatorSet) GetById(id uint64) (index uint, val *Validator) { - index_, val_ := vset.validators.Get(id) - index, val = uint(index_), val_.(*Validator) - return +func (vset *ValidatorSet) HasId(id uint64) bool { + idx := sort.Search(len(vset.validators), func(i int) bool { + return id <= vset.validators[i].Id + }) + return idx != len(vset.validators) && vset.validators[idx].Id == id } -func (vset *ValidatorSet) HasId(id uint64) bool { - _, val_ := vset.validators.Get(id) - return val_ != nil +func (vset *ValidatorSet) GetById(id uint64) (index uint, val *Validator) { + idx := sort.Search(len(vset.validators), func(i int) bool { + return id <= vset.validators[i].Id + }) + if idx != len(vset.validators) && vset.validators[idx].Id == id { + return uint(idx), vset.validators[idx].Copy() + } else { + return 0, nil + } } func (vset *ValidatorSet) GetByIndex(index uint) (id uint64, val *Validator) { - id_, val_ := vset.validators.GetByIndex(uint64(index)) - id, val = id_.(uint64), val_.(*Validator) - return + val = vset.validators[index] + return val.Id, val.Copy() } func (vset *ValidatorSet) Size() uint { - return uint(vset.validators.Size()) + return uint(len(vset.validators)) } func (vset *ValidatorSet) TotalVotingPower() uint64 { + if vset.totalVotingPower == 0 { + for _, val := range vset.validators { + vset.totalVotingPower += val.VotingPower + } + } return vset.totalVotingPower } func (vset *ValidatorSet) Proposer() (proposer *Validator) { - return vset.proposer + if vset.proposer == nil { + for _, val := range vset.validators { + vset.proposer = vset.proposer.CompareAccum(val) + } + } + return vset.proposer.Copy() } func (vset *ValidatorSet) Hash() []byte { - return vset.validators.Hash() + if len(vset.validators) == 0 { + return nil + } + hashables := make([]merkle.Hashable, len(vset.validators)) + for i, val := range vset.validators { + hashables[i] = val + } + return merkle.HashFromHashables(hashables) } func (vset *ValidatorSet) Add(val *Validator) (added bool) { - if vset.validators.Has(val.Id) { + val = val.Copy() + idx := sort.Search(len(vset.validators), func(i int) bool { + return val.Id <= vset.validators[i].Id + }) + if idx == len(vset.validators) { + vset.validators = append(vset.validators, val) + // Invalidate cache + vset.proposer = nil + vset.totalVotingPower = 0 + return true + } else if vset.validators[idx].Id == val.Id { return false + } else { + newValidators := append(vset.validators[:idx], val) + newValidators = append(newValidators, vset.validators[idx:]...) + vset.validators = newValidators + // Invalidate cache + vset.proposer = nil + vset.totalVotingPower = 0 + return true } - return !vset.validators.Set(val.Id, val) } func (vset *ValidatorSet) Update(val *Validator) (updated bool) { - if !vset.validators.Has(val.Id) { + index, sameVal := vset.GetById(val.Id) + if sameVal == nil { return false + } else { + vset.validators[index] = val.Copy() + // Invalidate cache + vset.proposer = nil + vset.totalVotingPower = 0 + return true } - return vset.validators.Set(val.Id, val) } -func (vset *ValidatorSet) Remove(validatorId uint64) (val *Validator, removed bool) { - val_, removed := vset.validators.Remove(validatorId) - return val_.(*Validator), removed +func (vset *ValidatorSet) Remove(id uint64) (val *Validator, removed bool) { + idx := sort.Search(len(vset.validators), func(i int) bool { + return id <= vset.validators[i].Id + }) + if idx == len(vset.validators) || vset.validators[idx].Id != id { + return nil, false + } else { + removedVal := vset.validators[idx] + newValidators := vset.validators[:idx] + if idx+1 < len(vset.validators) { + newValidators = append(newValidators, vset.validators[idx+1:]...) + } + vset.validators = newValidators + // Invalidate cache + vset.proposer = nil + vset.totalVotingPower = 0 + return removedVal, true + } } func (vset *ValidatorSet) Iterate(fn func(index uint, val *Validator) bool) { - index := uint(0) - vset.validators.Iterate(func(key_ interface{}, val_ interface{}) bool { - stop := fn(index, val_.(*Validator)) - index++ - return stop - }) + for i, val := range vset.validators { + stop := fn(uint(i), val.Copy()) + if stop { + break + } + } } func (vset *ValidatorSet) String() string { @@ -151,7 +239,7 @@ func (vset *ValidatorSet) StringWithIndent(indent string) string { %s Validators: %s %v %s}`, - indent, vset.proposer.String(), + indent, vset.Proposer().String(), indent, indent, strings.Join(valStrings, "\n"+indent+" "), indent)