diff --git a/consensus/consensus.go b/consensus/consensus.go index 57b32cc7..f578e73b 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "math" + "math/rand" "sync" "sync/atomic" "time" @@ -29,6 +30,10 @@ const ( roundDurationDelta = 15 * time.Second // Each successive round lasts 15 seconds longer. roundDeadlineBare = float64(1.0 / 3.0) // When the bare vote is due. roundDeadlinePrecommit = float64(2.0 / 3.0) // When the precommit vote is due. + + newBlockWaitDuration = roundDuration0 / 3 // The time to wait between commitTime and startTime of next consensus rounds. + voteRankCutoff = 2 // Higher ranks --> do not send votes. + unsolicitedVoteRate = 0.01 // Probability of sending a high ranked vote. ) //----------------------------------------------------------------------------- @@ -45,10 +50,11 @@ func calcRoundStartTime(round uint16, startTime time.Time) time.Time { } // calcs the current round given startTime of round zero. +// NOTE: round is zero if startTime is in the future. func calcRound(startTime time.Time) uint16 { now := time.Now() if now.Before(startTime) { - Panicf("Cannot calc round when startTime is in the future: %v", startTime) + return 0 } // Start + D_0 * R + D_delta * (R^2 - R)/2 <= Now; find largest integer R. // D_delta * R^2 + (2D_0 - D_delta) * R + 2(Start - Now) <= 0. @@ -71,6 +77,7 @@ func calcRound(startTime time.Time) uint16 { } // convenience +// NOTE: elapsedRatio can be negative if startTime is in the future. func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, roundDuration time.Duration, roundElapsed time.Duration, elapsedRatio float64) { round = calcRound(startTime) @@ -346,7 +353,7 @@ func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error { cm.mtx.Unlock() // Commit block onto the copied state. - err = stateCopy.CommitBlock(block, block.Header.Time) // NOTE: fake commit time. + err = stateCopy.CommitBlock(block) if err != nil { return err } @@ -409,7 +416,8 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error { func (cm *ConsensusManager) precommitProposal(rs *RoundState) error { // If we see a 2/3 majority for votes for a block, precommit. - if hash, ok := rs.RoundBareVotes.TwoThirdsMajority(); ok { + // TODO: maybe could use commitTime here and avg it with later commitTime? + if hash, _, ok := rs.RoundBareVotes.TwoThirdsMajority(); ok { if len(hash) == 0 { // 2/3 majority voted for nil. return nil @@ -442,18 +450,18 @@ func (cm *ConsensusManager) precommitProposal(rs *RoundState) error { } // Commit or unlock. -// Call after RoundStepPrecommit, after round has expired. -func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error { +// Call after RoundStepPrecommit, after round has completely expired. +func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) (commitTime time.Time, err error) { // If there exists a 2/3 majority of precommits. // Validate the block and commit. - if hash, ok := rs.RoundPrecommits.TwoThirdsMajority(); ok { + if hash, commitTime, ok := rs.RoundPrecommits.TwoThirdsMajority(); ok { // If the proposal is invalid or we don't have it, // do not commit. // TODO If we were just late to receive the block, when // do we actually get it? Document it. if cm.stageProposal(rs.Proposal) != nil { - return nil + return time.Time{}, nil } // TODO: Remove? cm.cs.LockProposal(rs.Proposal) @@ -465,16 +473,11 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error { Hash: hash, }) if err != nil { - return err + return time.Time{}, err } // Commit block. - // XXX use adjusted commit time. - // If we just use time.Now() we're not converging - // time differences between nodes, so nodes end up drifting - // in time. - commitTime := time.Now() cm.commitProposal(rs.Proposal, commitTime) - return nil + return commitTime, nil } else { // Otherwise, if a 1/3 majority if a block that isn't our locked one exists, unlock. locked := cm.cs.LockedProposal() @@ -483,14 +486,13 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error { if hashOrNil == nil { continue } - hash := hashOrNil.([]byte) - if !bytes.Equal(hash, locked.Block().Hash()) { + if !bytes.Equal(hashOrNil, locked.Block().Hash()) { // Unlock our lock. cm.cs.LockProposal(nil) } } } - return nil + return time.Time{}, nil } } @@ -511,6 +513,7 @@ func (cm *ConsensusManager) commitProposal(proposal *BlockPartSet, commitTime ti // What was staged becomes committed. cm.state = cm.stagedState + cm.state.Save(commitTime) cm.cs.Update(cm.state) cm.stagedProposal = nil cm.stagedState = nil @@ -650,7 +653,11 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() { _, _, roundDuration, _, elapsedRatio := calcRoundInfo(rs.StartTime) switch rs.Step() { case RoundStepStart: - // It's a new RoundState, immediately wake up and xn to RoundStepProposal. + // It's a new RoundState. + if elapsedRatio < 0 { + // startTime is in the future. + time.Sleep(time.Duration(-1.0*elapsedRatio) * roundDuration) + } cm.doActionCh <- RoundAction{rs.Height, rs.Round, RoundStepProposal} case RoundStepProposal: // Wake up when it's time to vote. @@ -722,14 +729,21 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() { log.Info("Error attempting to precommit for proposal: %v", err) } } else if step == RoundStepCommitOrUnlock && rs.Step() <= RoundStepPrecommits { - err := cm.commitOrUnlockProposal(rs) + commitTime, err := cm.commitOrUnlockProposal(rs) if err != nil { log.Info("Error attempting to commit or update for proposal: %v", err) } - // Round is over. This is a special case. - // Prepare a new RoundState for the next state. - cm.cs.SetupRound(rs.Round + 1) - return // setAlarm() takes care of the rest. + + if !commitTime.IsZero() { + // We already set up ConsensusState for the next height + // (it happens in the call to cm.commitProposal). + // XXX: call cm.cs.SetupHeight() + } else { + // Round is over. This is a special case. + // Prepare a new RoundState for the next state. + cm.cs.SetupRound(rs.Round + 1) + return // setAlarm() takes care of the rest. + } } else { return // Action is not relevant. } @@ -747,6 +761,7 @@ var ( ErrPeerStateInvalidStartTime = errors.New("Error peer state invalid startTime") ) +// TODO: voteRanks should purge bygone validators. type PeerState struct { mtx sync.Mutex connected bool @@ -754,7 +769,7 @@ type PeerState struct { height uint32 startTime time.Time // Derived from offset seconds. blockPartsBitArray []byte - votesWanted map[uint64]float32 + voteRanks map[uint64]uint8 cbHeight uint32 cbRound uint16 cbFunc func() @@ -762,10 +777,10 @@ type PeerState struct { func NewPeerState(peer *p2p.Peer) *PeerState { return &PeerState{ - connected: true, - peer: peer, - height: 0, - votesWanted: make(map[uint64]float32), + connected: true, + peer: peer, + height: 0, + voteRanks: make(map[uint64]uint8), } } @@ -818,10 +833,15 @@ func (ps *PeerState) WantsVote(vote *Vote) bool { if !ps.connected { return false } - // Only wants the vote if votesWanted says so - if ps.votesWanted[vote.SignerId] <= 0 { - // TODO: sometimes, send unsolicited votes to see if peer wants it. - return false + // Only wants the vote if voteRank is low. + if ps.voteRanks[vote.SignerId] > voteRankCutoff { + // Sometimes, send unsolicited votes to see if peer wants it. + if rand.Float32() < unsolicitedVoteRate { + // Continue on... + } else { + // Rank too high. Do not send vote. + return false + } } // Only wants the vote if peer's current height and round matches. if ps.height == vote.Height { @@ -890,7 +910,7 @@ func (ps *PeerState) ApplyKnownBlockPartsMessage(msg *KnownBlockPartsMessage) er func (ps *PeerState) ApplyVoteRankMessage(msg *VoteRankMessage) error { ps.mtx.Lock() defer ps.mtx.Unlock() - // XXX IMPLEMENT + ps.voteRanks[msg.ValidatorId] = msg.Rank return nil } diff --git a/consensus/state.go b/consensus/state.go index 2b89ad6d..83c31cc4 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -22,6 +22,7 @@ type ConsensusState struct { startTime time.Time // Start of round 0 for this height. commits *VoteSet // Commits for this height. roundState *RoundState // The RoundState object for the current round. + commitTime time.Time // Time at which a block was found to be committed by +2/3. } func NewConsensusState(state *State) *ConsensusState { @@ -54,6 +55,7 @@ func (cs *ConsensusState) RoundState() *RoundState { return cs.roundState } +// Primarily gets called upon block commit by ConsensusManager. func (cs *ConsensusState) Update(state *State) { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -68,7 +70,7 @@ func (cs *ConsensusState) Update(state *State) { cs.height = stateHeight cs.validatorsR0 = state.Validators().Copy() // NOTE: immutable. cs.lockedProposal = nil - cs.startTime = state.CommitTime() // XXX is this what we want? + cs.startTime = state.CommitTime().Add(newBlockWaitDuration) // NOTE: likely future time. cs.commits = NewVoteSet(stateHeight, 0, VoteTypeCommit, cs.validatorsR0) // Setup the roundState @@ -77,7 +79,7 @@ func (cs *ConsensusState) Update(state *State) { } -// If cs.roundSTate isn't at round, set up new roundState at round. +// If cs.roundState isn't at round, set up new roundState at round. func (cs *ConsensusState) SetupRound(round uint16) { cs.mtx.Lock() defer cs.mtx.Unlock() diff --git a/consensus/vote.go b/consensus/vote.go index b5dfe2c7..db0add10 100644 --- a/consensus/vote.go +++ b/consensus/vote.go @@ -5,6 +5,7 @@ import ( "errors" "io" "sync" + "time" . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/blocks" @@ -61,16 +62,19 @@ func (v *Vote) GetDocument() string { // VoteSet helps collect signatures from validators at each height+round // for a predefined vote type. +// TODO: test majority calculations etc. type VoteSet struct { - mtx sync.Mutex - height uint32 - round uint16 - type_ byte - validators *ValidatorSet - votes map[uint64]*Vote - votesByHash map[string]uint64 - totalVotes uint64 - totalVotingPower uint64 + mtx sync.Mutex + height uint32 + round uint16 + type_ byte + validators *ValidatorSet + votes map[uint64]*Vote + votesByHash map[string]uint64 + totalVotes uint64 + totalVotingPower uint64 + oneThirdMajority [][]byte + twoThirdsCommitTime time.Time } // Constructs a new VoteSet struct used to accumulate votes for each round. @@ -126,49 +130,38 @@ func (vs *VoteSet) AddVote(vote *Vote) (bool, error) { } } vs.votes[vote.SignerId] = vote - vs.votesByHash[string(vote.Hash)] += val.VotingPower + totalHashVotes := vs.votesByHash[string(vote.Hash)] + val.VotingPower + vs.votesByHash[string(vote.Hash)] = totalHashVotes vs.totalVotes += val.VotingPower + // If we just nudged it up to one thirds majority, add it. + if totalHashVotes > vs.totalVotingPower/3 && + (totalHashVotes-val.VotingPower) <= vs.totalVotingPower/3 { + vs.oneThirdMajority = append(vs.oneThirdMajority, vote.Hash) + } else if totalHashVotes > vs.totalVotingPower*2/3 && + (totalHashVotes-val.VotingPower) <= vs.totalVotingPower*2/3 { + vs.twoThirdsCommitTime = time.Now() + } return true, nil } // Returns either a blockhash (or nil) that received +2/3 majority. // If there exists no such majority, returns (nil, false). -func (vs *VoteSet) TwoThirdsMajority() (hash []byte, ok bool) { +func (vs *VoteSet) TwoThirdsMajority() (hash []byte, commitTime time.Time, ok bool) { vs.mtx.Lock() defer vs.mtx.Unlock() - twoThirdsMajority := (vs.totalVotingPower*2 + 2) / 3 - if vs.totalVotes < twoThirdsMajority { - return nil, false - } - for hash, votes := range vs.votesByHash { - if votes >= twoThirdsMajority { - if hash == "" { - return nil, true - } else { - return []byte(hash), true - } + // There's only one or two in the array. + for _, hash := range vs.oneThirdMajority { + if vs.votesByHash[string(hash)] > vs.totalVotingPower*2/3 { + return hash, vs.twoThirdsCommitTime, true } } - return nil, false + return nil, time.Time{}, false } // Returns blockhashes (or nil) that received a +1/3 majority. // If there exists no such majority, returns nil. -func (vs *VoteSet) OneThirdMajority() (hashes []interface{}) { +func (vs *VoteSet) OneThirdMajority() (hashes [][]byte) { vs.mtx.Lock() defer vs.mtx.Unlock() - oneThirdMajority := (vs.totalVotingPower + 2) / 3 - if vs.totalVotes < oneThirdMajority { - return nil - } - for hash, votes := range vs.votesByHash { - if votes >= oneThirdMajority { - if hash == "" { - hashes = append(hashes, nil) - } else { - hashes = append(hashes, []byte(hash)) - } - } - } - return hashes + return vs.oneThirdMajority } diff --git a/state/state.go b/state/state.go index 85db4dfb..0d5298ee 100644 --- a/state/state.go +++ b/state/state.go @@ -52,15 +52,19 @@ func LoadState(db db_.Db) *State { return s } -func (s *State) Save() { +// Save this state into the db. +// For convenience, the commitTime (required by ConsensusManager) +// is saved here. +func (s *State) Save(commitTime time.Time) { s.mtx.Lock() defer s.mtx.Unlock() + s.commitTime = commitTime s.accounts.Save() var buf bytes.Buffer var n int64 var err error WriteUInt32(&buf, s.height, &n, &err) - WriteTime(&buf, s.commitTime, &n, &err) + WriteTime(&buf, commitTime, &n, &err) WriteByteSlice(&buf, s.accounts.Hash(), &n, &err) for _, validator := range s.validators.Map() { WriteBinary(&buf, validator, &n, &err) @@ -91,7 +95,9 @@ func (s *State) CommitTx(tx *Tx) error { return nil } -func (s *State) CommitBlock(b *Block, commitTime time.Time) error { +// This is called during staging. +// The resulting state is cached until it is actually committed. +func (s *State) CommitBlock(b *Block) error { s.mtx.Lock() defer s.mtx.Unlock() // TODO commit the txs