broadcastVote sets peer's vote bitarray

This commit is contained in:
Jae Kwon
2014-10-26 00:47:40 -07:00
parent 3df3107479
commit 0f484b6315
6 changed files with 43 additions and 37 deletions

View File

@ -101,7 +101,7 @@ func TestBlock(t *testing.T) {
expectChange(func(b *Block) { b.Header.StateHash = RandBytes(32) }, "Expected hash to depend on StateHash") expectChange(func(b *Block) { b.Header.StateHash = RandBytes(32) }, "Expected hash to depend on StateHash")
expectChange(func(b *Block) { b.Validation.Signatures[0].SignerId += 1 }, "Expected hash to depend on Validation Signature") expectChange(func(b *Block) { b.Validation.Signatures[0].SignerId += 1 }, "Expected hash to depend on Validation Signature")
expectChange(func(b *Block) { b.Validation.Signatures[0].Bytes = RandBytes(32) }, "Expected hash to depend on Validation Signature") expectChange(func(b *Block) { b.Validation.Signatures[0].Bytes = RandBytes(32) }, "Expected hash to depend on Validation Signature")
expectChange(func(b *Block) { b.Data.Txs[0].(*SendTx).SignerId += 1 }, "Expected hash to depend on tx Signature") expectChange(func(b *Block) { b.Data.Txs[0].(*SendTx).Signature.SignerId += 1 }, "Expected hash to depend on tx Signature")
expectChange(func(b *Block) { b.Data.Txs[0].(*SendTx).Amount += 1 }, "Expected hash to depend on send tx Amount") expectChange(func(b *Block) { b.Data.Txs[0].(*SendTx).Amount += 1 }, "Expected hash to depend on send tx Amount")
// Write the block, read it in again, check hash. // Write the block, read it in again, check hash.

View File

@ -15,7 +15,7 @@ const (
) )
var ( var (
ErrVoteUnexpectedPhase = errors.New("Unexpected phase") ErrVoteUnexpectedStep = errors.New("Unexpected step")
ErrVoteInvalidAccount = errors.New("Invalid round vote account") ErrVoteInvalidAccount = errors.New("Invalid round vote account")
ErrVoteInvalidSignature = errors.New("Invalid round vote signature") ErrVoteInvalidSignature = errors.New("Invalid round vote signature")
ErrVoteInvalidBlockHash = errors.New("Invalid block hash") ErrVoteInvalidBlockHash = errors.New("Invalid block hash")
@ -65,7 +65,7 @@ func (v *Vote) String() string {
} }
switch v.Type { switch v.Type {
case VoteTypePrevote: case VoteTypePrevote:
return fmt.Sprintf("Vote{%v/%v:%X:%v}", v.Height, v.Round, blockHash, v.SignerId) return fmt.Sprintf("Prevote{%v/%v:%X:%v}", v.Height, v.Round, blockHash, v.SignerId)
case VoteTypePrecommit: case VoteTypePrecommit:
return fmt.Sprintf("Precommit{%v/%v:%X:%v}", v.Height, v.Round, blockHash, v.SignerId) return fmt.Sprintf("Precommit{%v/%v:%X:%v}", v.Height, v.Round, blockHash, v.SignerId)
case VoteTypeCommit: case VoteTypeCommit:

View File

@ -135,7 +135,7 @@ func (bA BitArray) Sub(o BitArray) BitArray {
} }
} }
func (bA BitArray) PickRandom() (int, bool) { func (bA BitArray) PickRandom() (uint, bool) {
length := len(bA.elems) length := len(bA.elems)
if length == 0 { if length == 0 {
return 0, false return 0, false
@ -149,7 +149,7 @@ func (bA BitArray) PickRandom() (int, bool) {
for j := 0; j < 64; j++ { for j := 0; j < 64; j++ {
bitIdx := ((j + randBitStart) % 64) bitIdx := ((j + randBitStart) % 64)
if (bA.elems[elemIdx] & (1 << uint(bitIdx))) > 0 { if (bA.elems[elemIdx] & (1 << uint(bitIdx))) > 0 {
return 64*int(elemIdx) + int(bitIdx), true return 64*uint(elemIdx) + uint(bitIdx), true
} }
} }
panic("should not happen") panic("should not happen")
@ -164,7 +164,7 @@ func (bA BitArray) PickRandom() (int, bool) {
for j := 0; j < elemBits; j++ { for j := 0; j < elemBits; j++ {
bitIdx := ((j + randBitStart) % elemBits) bitIdx := ((j + randBitStart) % elemBits)
if (bA.elems[elemIdx] & (1 << uint(bitIdx))) > 0 { if (bA.elems[elemIdx] & (1 << uint(bitIdx))) > 0 {
return 64*int(elemIdx) + int(bitIdx), true return 64*uint(elemIdx) + uint(bitIdx), true
} }
} }
} }

View File

@ -204,10 +204,8 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
switch msg_.(type) { switch msg_.(type) {
case *Proposal: case *Proposal:
proposal := msg_.(*Proposal) proposal := msg_.(*Proposal)
err = conR.conS.SetProposal(proposal)
if err != nil {
ps.SetHasProposal(proposal) ps.SetHasProposal(proposal)
} err = conR.conS.SetProposal(proposal)
case *PartMessage: case *PartMessage:
msg := msg_.(*PartMessage) msg := msg_.(*PartMessage)
@ -240,7 +238,7 @@ func (conR *ConsensusReactor) Receive(chId byte, peer *p2p.Peer, msgBytes []byte
return return
} }
ps.EnsureVoteBitArrays(rs.Height, rs.Round, rs.Validators.Size()) ps.EnsureVoteBitArrays(rs.Height, rs.Round, rs.Validators.Size())
ps.SetHasVote(rs.Height, rs.Round, vote.Type, uint32(index)) ps.SetHasVote(rs.Height, rs.Round, vote.Type, index)
added, err := conR.conS.AddVote(vote) added, err := conR.conS.AddVote(vote)
if err != nil { if err != nil {
log.Warning("Error attempting to add vote: %v", err) log.Warning("Error attempting to add vote: %v", err)
@ -397,7 +395,7 @@ ACTION_LOOP:
vote := conR.conS.RunActionPrevote(rs.Height, rs.Round) vote := conR.conS.RunActionPrevote(rs.Height, rs.Round)
broadcastNewRoundStep(RoundStepPrevote) broadcastNewRoundStep(RoundStepPrevote)
if vote != nil { if vote != nil {
conR.broadcastVote(vote) conR.broadcastVote(rs, vote)
} }
scheduleNextAction() scheduleNextAction()
continue ACTION_LOOP continue ACTION_LOOP
@ -409,7 +407,7 @@ ACTION_LOOP:
vote := conR.conS.RunActionPrecommit(rs.Height, rs.Round) vote := conR.conS.RunActionPrecommit(rs.Height, rs.Round)
broadcastNewRoundStep(RoundStepPrecommit) broadcastNewRoundStep(RoundStepPrecommit)
if vote != nil { if vote != nil {
conR.broadcastVote(vote) conR.broadcastVote(rs, vote)
} }
scheduleNextAction() scheduleNextAction()
continue ACTION_LOOP continue ACTION_LOOP
@ -430,7 +428,7 @@ ACTION_LOOP:
vote := conR.conS.RunActionCommit(rs.Height, rs.Round) vote := conR.conS.RunActionCommit(rs.Height, rs.Round)
broadcastNewRoundStep(RoundStepCommit) broadcastNewRoundStep(RoundStepCommit)
if vote != nil { if vote != nil {
conR.broadcastVote(vote) conR.broadcastVote(rs, vote)
} }
// do not schedule next action. // do not schedule next action.
continue ACTION_LOOP continue ACTION_LOOP
@ -445,7 +443,7 @@ ACTION_LOOP:
vote := conR.conS.RunActionCommit(rs.Height, rs.Round) vote := conR.conS.RunActionCommit(rs.Height, rs.Round)
broadcastNewRoundStep(RoundStepCommit) broadcastNewRoundStep(RoundStepCommit)
if vote != nil { if vote != nil {
conR.broadcastVote(vote) conR.broadcastVote(rs, vote)
} }
} }
// Wait for more commit votes. // Wait for more commit votes.
@ -471,9 +469,15 @@ ACTION_LOOP:
} }
} }
func (conR *ConsensusReactor) broadcastVote(vote *Vote) { func (conR *ConsensusReactor) broadcastVote(rs *RoundState, vote *Vote) {
// Get our validator index
index, _ := rs.Validators.GetById(vote.SignerId)
msg := p2p.TypedMessage{msgTypeVote, vote} msg := p2p.TypedMessage{msgTypeVote, vote}
conR.sw.Broadcast(VoteCh, msg) for _, peer := range conR.sw.Peers().List() {
peer.Send(VoteCh, msg)
ps := peer.Data.Get(peerStateKey).(*PeerState)
ps.SetHasVote(rs.Height, rs.Round, vote.Type, index)
}
} }
//-------------------------------------- //--------------------------------------
@ -565,18 +569,19 @@ OUTER_LOOP:
ps.EnsureVoteBitArrays(rs.Height, rs.Round, rs.Validators.Size()) ps.EnsureVoteBitArrays(rs.Height, rs.Round, rs.Validators.Size())
// If there are prevotes to send... // If there are prevotes to send...
if prs.Step <= RoundStepPrevote { if rs.Round == prs.Round && prs.Step <= RoundStepPrevote {
index, ok := rs.Prevotes.BitArray().Sub(prs.Prevotes).PickRandom() index, ok := rs.Prevotes.BitArray().Sub(prs.Prevotes).PickRandom()
if ok { if ok {
valId, val := rs.Validators.GetByIndex(uint32(index)) valId, val := rs.Validators.GetByIndex(index)
if val != nil { if val != nil {
vote := rs.Prevotes.Get(valId) vote := rs.Prevotes.Get(valId)
// NOTE: vote may be a commit
msg := p2p.TypedMessage{msgTypeVote, vote} msg := p2p.TypedMessage{msgTypeVote, vote}
peer.Send(VoteCh, msg) peer.Send(VoteCh, msg)
ps.SetHasVote(rs.Height, rs.Round, VoteTypePrevote, uint32(index)) ps.SetHasVote(rs.Height, rs.Round, VoteTypePrevote, index)
if vote.Type == VoteTypeCommit { if vote.Type == VoteTypeCommit {
ps.SetHasVote(rs.Height, rs.Round, VoteTypePrecommit, uint32(index)) ps.SetHasVote(rs.Height, rs.Round, VoteTypePrecommit, index)
ps.SetHasVote(rs.Height, rs.Round, VoteTypeCommit, uint32(index)) ps.SetHasVote(rs.Height, rs.Round, VoteTypeCommit, index)
} }
continue OUTER_LOOP continue OUTER_LOOP
} else { } else {
@ -586,17 +591,18 @@ OUTER_LOOP:
} }
// If there are precommits to send... // If there are precommits to send...
if prs.Step <= RoundStepPrecommit { if rs.Round == prs.Round && prs.Step <= RoundStepPrecommit {
index, ok := rs.Precommits.BitArray().Sub(prs.Precommits).PickRandom() index, ok := rs.Precommits.BitArray().Sub(prs.Precommits).PickRandom()
if ok { if ok {
valId, val := rs.Validators.GetByIndex(uint32(index)) valId, val := rs.Validators.GetByIndex(index)
if val != nil { if val != nil {
vote := rs.Precommits.Get(valId) vote := rs.Precommits.Get(valId)
// NOTE: vote may be a commit
msg := p2p.TypedMessage{msgTypeVote, vote} msg := p2p.TypedMessage{msgTypeVote, vote}
peer.Send(VoteCh, msg) peer.Send(VoteCh, msg)
ps.SetHasVote(rs.Height, rs.Round, VoteTypePrecommit, uint32(index)) ps.SetHasVote(rs.Height, rs.Round, VoteTypePrecommit, index)
if vote.Type == VoteTypeCommit { if vote.Type == VoteTypeCommit {
ps.SetHasVote(rs.Height, rs.Round, VoteTypeCommit, uint32(index)) ps.SetHasVote(rs.Height, rs.Round, VoteTypeCommit, index)
} }
continue OUTER_LOOP continue OUTER_LOOP
} else { } else {
@ -608,12 +614,12 @@ OUTER_LOOP:
// If there are any commits to send... // If there are any commits to send...
index, ok := rs.Commits.BitArray().Sub(prs.Commits).PickRandom() index, ok := rs.Commits.BitArray().Sub(prs.Commits).PickRandom()
if ok { if ok {
valId, val := rs.Validators.GetByIndex(uint32(index)) valId, val := rs.Validators.GetByIndex(index)
if val != nil { if val != nil {
vote := rs.Commits.Get(valId) vote := rs.Commits.Get(valId)
msg := p2p.TypedMessage{msgTypeVote, vote} msg := p2p.TypedMessage{msgTypeVote, vote}
peer.Send(VoteCh, msg) peer.Send(VoteCh, msg)
ps.SetHasVote(rs.Height, rs.Round, VoteTypeCommit, uint32(index)) ps.SetHasVote(rs.Height, rs.Round, VoteTypeCommit, index)
continue OUTER_LOOP continue OUTER_LOOP
} else { } else {
log.Error("index is not a valid validator index") log.Error("index is not a valid validator index")
@ -726,7 +732,7 @@ func (ps *PeerState) EnsureVoteBitArrays(height uint32, round uint16, numValidat
} }
} }
func (ps *PeerState) SetHasVote(height uint32, round uint16, type_ uint8, index uint32) { func (ps *PeerState) SetHasVote(height uint32, round uint16, type_ uint8, index uint) {
ps.mtx.Lock() ps.mtx.Lock()
defer ps.mtx.Unlock() defer ps.mtx.Unlock()
@ -736,11 +742,11 @@ func (ps *PeerState) SetHasVote(height uint32, round uint16, type_ uint8, index
switch type_ { switch type_ {
case VoteTypePrevote: case VoteTypePrevote:
ps.Prevotes.SetIndex(uint(index), true) ps.Prevotes.SetIndex(index, true)
case VoteTypePrecommit: case VoteTypePrecommit:
ps.Precommits.SetIndex(uint(index), true) ps.Precommits.SetIndex(index, true)
case VoteTypeCommit: case VoteTypeCommit:
ps.Commits.SetIndex(uint(index), true) ps.Commits.SetIndex(index, true)
default: default:
panic("Invalid vote type") panic("Invalid vote type")
} }

View File

@ -50,18 +50,18 @@ func NewVoteSet(height uint32, round uint16, type_ byte, vset *state.ValidatorSe
} }
// True if added, false if not. // True if added, false if not.
// Returns ErrVote[UnexpectedPhase|InvalidAccount|InvalidSignature|InvalidBlockHash|ConflictingSignature] // Returns ErrVote[UnexpectedStep|InvalidAccount|InvalidSignature|InvalidBlockHash|ConflictingSignature]
// NOTE: vote should not be mutated after adding. // NOTE: vote should not be mutated after adding.
func (vs *VoteSet) Add(vote *Vote) (bool, error) { func (vs *VoteSet) Add(vote *Vote) (bool, error) {
vs.mtx.Lock() vs.mtx.Lock()
defer vs.mtx.Unlock() defer vs.mtx.Unlock()
// Make sure the phase matches. (or that vote is commit && round < vs.round) // Make sure the step matches. (or that vote is commit && round < vs.round)
if vote.Height != vs.height || if vote.Height != vs.height ||
(vote.Type != VoteTypeCommit && vote.Round != vs.round) || (vote.Type != VoteTypeCommit && vote.Round != vs.round) ||
(vote.Type != VoteTypeCommit && vote.Type != vs.type_) || (vote.Type != VoteTypeCommit && vote.Type != vs.type_) ||
(vote.Type == VoteTypeCommit && vs.type_ != VoteTypeCommit && vote.Round >= vs.round) { (vote.Type == VoteTypeCommit && vs.type_ != VoteTypeCommit && vote.Round >= vs.round) {
return false, ErrVoteUnexpectedPhase return false, ErrVoteUnexpectedStep
} }
// Ensure that signer is a validator. // Ensure that signer is a validator.

View File

@ -75,9 +75,9 @@ func (vset *ValidatorSet) Copy() *ValidatorSet {
} }
} }
func (vset *ValidatorSet) GetById(id uint64) (index uint32, val *Validator) { func (vset *ValidatorSet) GetById(id uint64) (index uint, val *Validator) {
index_, val_ := vset.validators.Get(id) index_, val_ := vset.validators.Get(id)
index, val = uint32(index_), val_.(*Validator) index, val = uint(index_), val_.(*Validator)
return return
} }
@ -86,7 +86,7 @@ func (vset *ValidatorSet) HasId(id uint64) bool {
return val_ != nil return val_ != nil
} }
func (vset *ValidatorSet) GetByIndex(index uint32) (id uint64, val *Validator) { func (vset *ValidatorSet) GetByIndex(index uint) (id uint64, val *Validator) {
id_, val_ := vset.validators.GetByIndex(uint64(index)) id_, val_ := vset.validators.GetByIndex(uint64(index))
id, val = id_.(uint64), val_.(*Validator) id, val = id_.(uint64), val_.(*Validator)
return return