From 5dfa2ecebb45721fec260947d16333985f9a0ec0 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 7 Sep 2014 02:21:25 -0700 Subject: [PATCH] share block parts when we're the proposer --- blocks/block.go | 36 +--- {consensus => blocks}/block_part_set.go | 97 +++++---- blocks/store.go | 15 +- consensus/consensus.go | 276 ++++++++++++++++++------ consensus/document.go | 41 ++++ consensus/priv_validator.go | 22 +- consensus/state.go | 7 +- consensus/vote.go | 54 +---- merkle/util.go | 25 ++- state/account.go | 23 +- state/store.go | 19 -- 11 files changed, 390 insertions(+), 225 deletions(-) rename {consensus => blocks}/block_part_set.go (51%) create mode 100644 consensus/document.go delete mode 100644 state/store.go diff --git a/blocks/block.go b/blocks/block.go index 21fd1eba..2a849401 100644 --- a/blocks/block.go +++ b/blocks/block.go @@ -2,13 +2,11 @@ package blocks import ( "crypto/sha256" - "fmt" "io" "time" . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" - "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/merkle" ) @@ -16,14 +14,6 @@ const ( defaultBlockPartSizeBytes = 4096 ) -func CalcBlockURI(height uint32, hash []byte) string { - return fmt.Sprintf("%v://block/%v#%X", - config.Config.Network, - height, - hash, - ) -} - type Block struct { Header Validation @@ -49,15 +39,10 @@ func (b *Block) WriteTo(w io.Writer) (n int64, err error) { } func (b *Block) ValidateBasic() error { - // Basic validation that doesn't involve context. - // XXX + // TODO Basic validation that doesn't involve context. return nil } -func (b *Block) URI() string { - return CalcBlockURI(b.Height, b.Hash()) -} - func (b *Block) Hash() []byte { if b.hash != nil { return b.hash @@ -73,7 +58,8 @@ func (b *Block) Hash() []byte { } // The returns parts must be signed afterwards. -func (b *Block) ToBlockParts() (parts []*BlockPart) { +func (b *Block) ToBlockPartSet() *BlockPartSet { + var parts []*BlockPart blockBytes := BinaryBytes(b) total := (len(blockBytes) + defaultBlockPartSizeBytes - 1) / defaultBlockPartSizeBytes for i := 0; i < total; i++ { @@ -90,7 +76,7 @@ func (b *Block) ToBlockParts() (parts []*BlockPart) { } parts = append(parts, part) } - return parts + return NewBlockPartSet(b.Height, parts) } //----------------------------------------------------------------------------- @@ -133,18 +119,8 @@ func (bp *BlockPart) WriteTo(w io.Writer) (n int64, err error) { return } -func (bp *BlockPart) URI() string { - return fmt.Sprintf("%v://block/%v/%v[%v/%v]#%X\n", - config.Config.Network, - bp.Height, - bp.Round, - bp.Index, - bp.Total, - bp.BlockPartHash(), - ) -} - -func (bp *BlockPart) BlockPartHash() []byte { +// Hash returns the hash of the block part data bytes. +func (bp *BlockPart) Hash() []byte { if bp.hash != nil { return bp.hash } else { diff --git a/consensus/block_part_set.go b/blocks/block_part_set.go similarity index 51% rename from consensus/block_part_set.go rename to blocks/block_part_set.go index 0ce3ad3a..d97f4e8d 100644 --- a/consensus/block_part_set.go +++ b/blocks/block_part_set.go @@ -1,50 +1,44 @@ -package consensus +package blocks import ( "bytes" "errors" "sync" - . "github.com/tendermint/tendermint/blocks" - . "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/merkle" ) -// Helper for keeping track of block parts. +// A collection of block parts. +// Doesn't do any validation. type BlockPartSet struct { mtx sync.Mutex - signer *Account height uint32 - round uint16 // Not used - total uint16 - numParts uint16 + total uint16 // total number of parts + numParts uint16 // number of parts in this set parts []*BlockPart _block *Block // cache } var ( - ErrInvalidBlockPartSignature = errors.New("Invalid block part signature") // Peer gave us a fake part - ErrInvalidBlockPartConflict = errors.New("Invalid block part conflict") // Signer signed conflicting parts + ErrInvalidBlockPartConflict = errors.New("Invalid block part conflict") // Signer signed conflicting parts ) -// Signer may be nil if signer is unknown beforehand. -func NewBlockPartSet(height uint32, round uint16, signer *Account) *BlockPartSet { - return &BlockPartSet{ - signer: signer, - height: height, - round: round, +// parts may be nil if the parts aren't in hand. +func NewBlockPartSet(height uint32, parts []*BlockPart) *BlockPartSet { + bps := &BlockPartSet{ + height: height, + parts: parts, + numParts: uint16(len(parts)), } + if len(parts) > 0 { + bps.total = parts[0].Total + } + return bps } -// In the case where the signer wasn't known prior to NewBlockPartSet(), -// user should call SetSigner() prior to AddBlockPart(). -func (bps *BlockPartSet) SetSigner(signer *Account) { - bps.mtx.Lock() - defer bps.mtx.Unlock() - if bps.signer != nil { - panic("BlockPartSet signer already set.") - } - bps.signer = signer +func (bps *BlockPartSet) Height() uint32 { + return bps.height } func (bps *BlockPartSet) BlockParts() []*BlockPart { @@ -69,19 +63,12 @@ func (bps *BlockPartSet) BitArray() []byte { } // If the part isn't valid, returns an error. -// err can be ErrInvalidBlockPart[Conflict|Signature] +// err can be ErrInvalidBlockPartConflict +// NOTE: Caller must check the signature before adding. func (bps *BlockPartSet) AddBlockPart(part *BlockPart) (added bool, err error) { bps.mtx.Lock() defer bps.mtx.Unlock() - // If part is invalid, return an error. - /* XXX - err = part.ValidateWithSigner(bps.signer) - if err != nil { - return false, err - } - */ - if bps.parts == nil { // First received part for this round. bps.parts = make([]*BlockPart, part.Total) @@ -128,14 +115,44 @@ func (bps *BlockPartSet) Block() *Block { bps.mtx.Lock() defer bps.mtx.Unlock() if bps._block == nil { - blockBytes := []byte{} - for _, part := range bps.parts { - blockBytes = append(blockBytes, part.Bytes...) + block, err := BlockPartsToBlock(bps.parts) + if err != nil { + panic(err) } - var n int64 - var err error - block := ReadBlock(bytes.NewReader(blockBytes), &n, &err) bps._block = block } return bps._block } + +func (bps *BlockPartSet) Hash() []byte { + if !bps.IsComplete() { + panic("Cannot get hash of an incomplete BlockPartSet") + } + hashes := [][]byte{} + for _, part := range bps.parts { + partHash := part.Hash() + hashes = append(hashes, partHash) + } + return merkle.HashFromByteSlices(hashes) +} + +// The proposal hash includes both the block hash +// as well as the BlockPartSet merkle hash. +func (bps *BlockPartSet) ProposalHash() []byte { + bpsHash := bps.Hash() + blockHash := bps.Block().Hash() + return merkle.HashFromByteSlices([][]byte{bpsHash, blockHash}) +} + +//----------------------------------------------------------------------------- + +func BlockPartsToBlock(parts []*BlockPart) (*Block, error) { + blockBytes := []byte{} + for _, part := range parts { + blockBytes = append(blockBytes, part.Bytes...) + } + var n int64 + var err error + block := ReadBlock(bytes.NewReader(blockBytes), &n, &err) + return block, err +} diff --git a/blocks/store.go b/blocks/store.go index 0f4391b5..54a0f3a2 100644 --- a/blocks/store.go +++ b/blocks/store.go @@ -90,8 +90,19 @@ func (bs *BlockStore) LoadBlock(height uint32) *Block { if part0 == nil { return nil } - // XXX implement - panic("TODO: Not implemented") + parts := []*BlockPart{part0} + for i := uint16(1); i < part0.Total; i++ { + part := bs.LoadBlockPart(height, i) + if part == nil { + Panicf("Failed to retrieve block part %v at height %v", i, height) + } + parts = append(parts, part) + } + block, err := BlockPartsToBlock(parts) + if err != nil { + panic(err) + } + return block } // NOTE: Assumes that parts as well as the block are valid. See StageBlockParts(). diff --git a/consensus/consensus.go b/consensus/consensus.go index b8df6257..57b32cc7 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -33,16 +33,6 @@ const ( //----------------------------------------------------------------------------- -// convenience -func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, roundDuration time.Duration, roundElapsed time.Duration, elapsedRatio float64) { - round = calcRound(startTime) - roundStartTime = calcRoundStartTime(round, startTime) - roundDuration = calcRoundDuration(round) - roundElapsed = time.Now().Sub(roundStartTime) - elapsedRatio = float64(roundElapsed) / float64(roundDuration) - return -} - // total duration of given round func calcRoundDuration(round uint16) time.Duration { return roundDuration0 + roundDurationDelta*time.Duration(round) @@ -80,6 +70,17 @@ func calcRound(startTime time.Time) uint16 { return uint16(R) } +// convenience +func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, roundDuration time.Duration, + roundElapsed time.Duration, elapsedRatio float64) { + round = calcRound(startTime) + roundStartTime = calcRoundStartTime(round, startTime) + roundDuration = calcRoundDuration(round) + roundElapsed = time.Now().Sub(roundStartTime) + elapsedRatio = float64(roundElapsed) / float64(roundDuration) + return +} + //----------------------------------------------------------------------------- type ConsensusManager struct { @@ -127,6 +128,12 @@ func (cm *ConsensusManager) SetPrivValidator(priv *PrivValidator) { cm.privValidator = priv } +func (cm *ConsensusManager) PrivValidator() *PrivValidator { + cm.mtx.Lock() + defer cm.mtx.Unlock() + return cm.privValidator +} + func (cm *ConsensusManager) Start() { if atomic.CompareAndSwapUint32(&cm.started, 0, 1) { log.Info("Starting ConsensusManager") @@ -164,12 +171,16 @@ func (cm *ConsensusManager) switchEventsRoutine() { // By sending KnownBlockPartsMessage, // we send our height/round + startTime, and known block parts, // which is sufficient for the peer to begin interacting with us. - event.Peer.TrySend(ProposalCh, cm.makeKnownBlockPartsMessage()) + event.Peer.TrySend(ProposalCh, cm.makeKnownBlockPartsMessage(cm.cs.RoundState())) case p2p.SwitchEventDonePeer: event := swEvent.(p2p.SwitchEventDonePeer) // Delete peerState for event.Peer cm.mtx.Lock() - delete(cm.peerStates, event.Peer.Key) + peerState := cm.peerStates[event.Peer.Key] + if peerState != nil { + peerState.Disconnect() + delete(cm.peerStates, event.Peer.Key) + } cm.mtx.Unlock() default: log.Warning("Unhandled switch event type") @@ -178,23 +189,19 @@ func (cm *ConsensusManager) switchEventsRoutine() { } // Like, how large is it and how often can we send it? -func (cm *ConsensusManager) makeKnownBlockPartsMessage() *KnownBlockPartsMessage { - rs := cm.cs.RoundState() +func (cm *ConsensusManager) makeKnownBlockPartsMessage(rs *RoundState) *KnownBlockPartsMessage { return &KnownBlockPartsMessage{ Height: rs.Height, SecondsSinceStartTime: uint32(time.Now().Sub(rs.StartTime).Seconds()), - BlockPartsBitArray: rs.BlockPartSet.BitArray(), + BlockPartsBitArray: rs.Proposal.BitArray(), } } +// NOTE: may return nil, but (nil).Wants*() returns false. func (cm *ConsensusManager) getPeerState(peer *p2p.Peer) *PeerState { cm.mtx.Lock() defer cm.mtx.Unlock() - peerState := cm.peerStates[peer.Key] - if peerState == nil { - log.Warning("Wanted peerState for %v but none exists", peer) - } - return peerState + return cm.peerStates[peer.Key] } func (cm *ConsensusManager) gossipProposalRoutine() { @@ -221,27 +228,33 @@ OUTER_LOOP: // TODO Continue if we've already voted, then no point processing the part. + // Check that the signature is valid and from proposer. + if rs.Proposer.Verify(msg.BlockPart.Hash(), msg.BlockPart.Signature) { + // TODO handle bad peer. + continue OUTER_LOOP + } + + // If we are the proposer, then don't do anything else. + // We're already sending peers our proposal on another routine. + privValidator := cm.PrivValidator() + if privValidator != nil && rs.Proposer.Account.Id == privValidator.Id { + continue OUTER_LOOP + } + // Add and process the block part - added, err := rs.BlockPartSet.AddBlockPart(msg.BlockPart) + added, err := rs.Proposal.AddBlockPart(msg.BlockPart) if err == ErrInvalidBlockPartConflict { // TODO: Bad validator - } else if err == ErrInvalidBlockPartSignature { - // TODO: Bad peer } else if err != nil { Panicf("Unexpected blockPartsSet error %v", err) } if added { // If peer wants this part, send peer the part // and our new blockParts state. - kbpMsg := cm.makeKnownBlockPartsMessage() + kbpMsg := cm.makeKnownBlockPartsMessage(rs) partMsg := &BlockPartMessage{BlockPart: msg.BlockPart} - PEERS_LOOP: for _, peer := range cm.sw.Peers().List() { peerState := cm.getPeerState(peer) - if peerState == nil { - // Peer disconnected before we were able to process. - continue PEERS_LOOP - } if peerState.WantsBlockPart(msg.BlockPart) { peer.TrySend(KnownPartsCh, kbpMsg) peer.TrySend(ProposalCh, partMsg) @@ -282,7 +295,7 @@ OUTER_LOOP: continue OUTER_LOOP } peerState := cm.getPeerState(inMsg.MConn.Peer) - if peerState == nil { + if !peerState.IsConnected() { // Peer disconnected before we were able to process. continue OUTER_LOOP } @@ -295,8 +308,9 @@ OUTER_LOOP: // Signs a vote document and broadcasts it. // hash can be nil to vote "nil" func (cm *ConsensusManager) signAndVote(vote *Vote) error { - if cm.privValidator != nil { - err := cm.privValidator.SignVote(vote) + privValidator := cm.PrivValidator() + if privValidator != nil { + err := privValidator.SignVote(vote) if err != nil { return err } @@ -345,8 +359,10 @@ func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error { return nil } -func (cm *ConsensusManager) constructProposal(rs *RoundState) (*Block, error) { - // XXX implement +// Constructs an unsigned proposal +func (cm *ConsensusManager) constructProposal(rs *RoundState) (*BlockPartSet, error) { + // XXX implement, first implement mempool + // proposal := block.ToBlockPartSet() return nil, nil } @@ -367,7 +383,7 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error { return err } // Stage proposal - err := cm.stageProposal(rs.BlockPartSet) + err := cm.stageProposal(rs.Proposal) if err != nil { // Vote for nil, whatever the error. err := cm.signAndVote(&Vote{ @@ -383,7 +399,7 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error { Height: rs.Height, Round: rs.Round, Type: VoteTypeBare, - Hash: rs.BlockPartSet.Block().Hash(), + Hash: rs.Proposal.Block().Hash(), }) return err } @@ -402,13 +418,13 @@ func (cm *ConsensusManager) precommitProposal(rs *RoundState) error { // If proposal is invalid or unknown, do nothing. // See note on ZombieValidators to see why. - if cm.stageProposal(rs.BlockPartSet) != nil { + if cm.stageProposal(rs.Proposal) != nil { return nil } // Lock this proposal. // NOTE: we're unlocking any prior locks. - cm.cs.LockProposal(rs.BlockPartSet) + cm.cs.LockProposal(rs.Proposal) // Send precommit vote. err := cm.signAndVote(&Vote{ @@ -428,19 +444,19 @@ func (cm *ConsensusManager) precommitProposal(rs *RoundState) error { // Commit or unlock. // Call after RoundStepPrecommit, after round has expired. func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error { + // If there exists a 2/3 majority of precommits. + // Validate the block and commit. if hash, ok := rs.RoundPrecommits.TwoThirdsMajority(); ok { - // If there exists a 2/3 majority of precommits. - // Validate the block and commit. // If the proposal is invalid or we don't have it, // do not commit. // TODO If we were just late to receive the block, when // do we actually get it? Document it. - if cm.stageProposal(rs.BlockPartSet) != nil { + if cm.stageProposal(rs.Proposal) != nil { return nil } // TODO: Remove? - cm.cs.LockProposal(rs.BlockPartSet) + cm.cs.LockProposal(rs.Proposal) // Vote commit. err := cm.signAndVote(&Vote{ Height: rs.Height, @@ -457,7 +473,7 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error { // time differences between nodes, so nodes end up drifting // in time. commitTime := time.Now() - cm.commitProposal(rs.BlockPartSet, commitTime) + cm.commitProposal(rs.Proposal, commitTime) return nil } else { // Otherwise, if a 1/3 majority if a block that isn't our locked one exists, unlock. @@ -478,16 +494,16 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error { } } -func (cm *ConsensusManager) commitProposal(blockPartSet *BlockPartSet, commitTime time.Time) error { +func (cm *ConsensusManager) commitProposal(proposal *BlockPartSet, commitTime time.Time) error { cm.mtx.Lock() defer cm.mtx.Unlock() - if cm.stagedProposal != blockPartSet { + if cm.stagedProposal != proposal { panic("Unexpected stagedProposal.") // Shouldn't happen. } // Save to blockStore - block, blockParts := blockPartSet.Block(), blockPartSet.BlockParts() + block, blockParts := proposal.Block(), proposal.BlockParts() err := cm.blockStore.SaveBlockParts(block.Height, blockParts) if err != nil { return err @@ -502,6 +518,56 @@ func (cm *ConsensusManager) commitProposal(blockPartSet *BlockPartSet, commitTim return nil } +// Given a RoundState where we are the proposer, +// broadcast rs.proposal to all the peers. +func (cm *ConsensusManager) shareProposal(rs *RoundState) { + privValidator := cm.PrivValidator() + proposal := rs.Proposal + if privValidator == nil || proposal == nil { + return + } + privValidator.SignProposal(rs.Round, proposal) + blockParts := proposal.BlockParts() + peers := cm.sw.Peers().List() + if len(peers) == 0 { + log.Warning("Could not propose: no peers") + return + } + numBlockParts := uint16(len(blockParts)) + kbpMsg := cm.makeKnownBlockPartsMessage(rs) + for i, peer := range peers { + peerState := cm.getPeerState(peer) + if !peerState.IsConnected() { + continue // Peer was disconnected. + } + startIndex := uint16((i * len(blockParts)) / len(peers)) + // Create a function that when called, + // starts sending block parts to peer. + cb := func(peer *p2p.Peer, startIndex uint16) func() { + return func() { + // TODO: if the clocks are off a bit, + // peer may receive this before the round flips. + peer.Send(KnownPartsCh, kbpMsg) + for i := uint16(0); i < numBlockParts; i++ { + part := blockParts[(startIndex+i)%numBlockParts] + // Ensure round hasn't expired on our end. + currentRS := cm.cs.RoundState() + if currentRS != rs { + return + } + // If peer wants the block: + if peerState.WantsBlockPart(part) { + partMsg := &BlockPartMessage{BlockPart: part} + peer.Send(ProposalCh, partMsg) + } + } + } + }(peer, startIndex) + // Call immediately or schedule cb for when peer is ready. + peerState.SetRoundCallback(rs.Height, rs.Round, cb) + } +} + func (cm *ConsensusManager) gossipVoteRoutine() { OUTER_LOOP: for { @@ -544,13 +610,8 @@ OUTER_LOOP: } // Gossip vote. - PEERS_LOOP: for _, peer := range cm.sw.Peers().List() { peerState := cm.getPeerState(peer) - if peerState == nil { - // Peer disconnected before we were able to process. - continue PEERS_LOOP - } if peerState.WantsVote(vote) { msg := p2p.TypedMessage{msgTypeVote, vote} peer.TrySend(VoteCh, msg) @@ -630,17 +691,25 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() { if step == RoundStepProposal && rs.Step() == RoundStepStart { // Propose a block if I am the proposer. - if cm.privValidator != nil && rs.Proposer.Account.Id == cm.privValidator.Id { - block, err := cm.constructProposal(rs) - if err != nil { - log.Error("Error attempting to construct a proposal: %v", err) + privValidator := cm.PrivValidator() + if privValidator != nil && rs.Proposer.Account.Id == privValidator.Id { + // If we're already locked on a proposal, use that. + proposal := cm.cs.LockedProposal() + if proposal != nil { + // Otherwise, construct a new proposal. + var err error + proposal, err = cm.constructProposal(rs) + if err != nil { + log.Error("Error attempting to construct a proposal: %v", err) + return // Pretend like we weren't the proposer. Shrug. + } } - // XXX propose the block. - log.Error("XXX use ", block) - // XXX divide block into parts - // XXX communicate parts. - // XXX put this in another function. - panic("Implement block proposal!") + // Set proposal for roundState, so we vote correctly subsequently. + rs.Proposal = proposal + // Share the parts. + // We send all parts to all of our peers, but everyone receives parts + // starting at a different index, wrapping around back to 0. + cm.shareProposal(rs) } } else if step == RoundStepBareVotes && rs.Step() <= RoundStepProposal { err := cm.voteProposal(rs) @@ -680,28 +749,57 @@ var ( type PeerState struct { mtx sync.Mutex + connected bool peer *p2p.Peer height uint32 startTime time.Time // Derived from offset seconds. blockPartsBitArray []byte votesWanted map[uint64]float32 + cbHeight uint32 + cbRound uint16 + cbFunc func() } func NewPeerState(peer *p2p.Peer) *PeerState { return &PeerState{ + connected: true, peer: peer, height: 0, votesWanted: make(map[uint64]float32), } } -func (ps *PeerState) WantsBlockPart(part *BlockPart) bool { +func (ps *PeerState) IsConnected() bool { + if ps == nil { + return false + } ps.mtx.Lock() defer ps.mtx.Unlock() + return ps.connected +} + +func (ps *PeerState) Disconnect() { + ps.mtx.Lock() + defer ps.mtx.Unlock() + ps.connected = false +} + +func (ps *PeerState) WantsBlockPart(part *BlockPart) bool { + if ps == nil { + return false + } + ps.mtx.Lock() + defer ps.mtx.Unlock() + if !ps.connected { + return false + } // Only wants the part if peer's current height and round matches. if ps.height == part.Height { - round, _, _, _, elapsedRatio := calcRoundInfo(ps.startTime) - if round == part.Round && elapsedRatio < roundDeadlineBare { + round := calcRound(ps.startTime) + // NOTE: validators want to receive remaining block parts + // even after it had voted bare or precommit. + // Ergo, we do not check for which step the peer is in. + if round == part.Round { // Only wants the part if it doesn't already have it. if ps.blockPartsBitArray[part.Index/8]&byte(1<<(part.Index%8)) == 0 { return true @@ -712,8 +810,14 @@ func (ps *PeerState) WantsBlockPart(part *BlockPart) bool { } func (ps *PeerState) WantsVote(vote *Vote) bool { + if ps == nil { + return false + } ps.mtx.Lock() defer ps.mtx.Unlock() + if !ps.connected { + return false + } // Only wants the vote if votesWanted says so if ps.votesWanted[vote.SignerId] <= 0 { // TODO: sometimes, send unsolicited votes to see if peer wants it. @@ -773,6 +877,12 @@ func (ps *PeerState) ApplyKnownBlockPartsMessage(msg *KnownBlockPartsMessage) er ps.startTime = newStartTime ps.height = msg.Height ps.blockPartsBitArray = msg.BlockPartsBitArray + // Call callback if height+round matches. + peerRound := calcRound(ps.startTime) + if ps.cbFunc != nil && ps.cbHeight == ps.height && ps.cbRound == peerRound { + go ps.cbFunc() + ps.cbFunc = nil + } } return nil } @@ -784,6 +894,44 @@ func (ps *PeerState) ApplyVoteRankMessage(msg *VoteRankMessage) error { return nil } +// Sets a single round callback, to be called when the height+round comes around. +// If the height+round is current, calls "go f()" immediately. +// Otherwise, does nothing. +func (ps *PeerState) SetRoundCallback(height uint32, round uint16, f func()) { + ps.mtx.Lock() + defer ps.mtx.Unlock() + if ps.height < height { + ps.cbHeight = height + ps.cbRound = round + ps.cbFunc = f + // Wait until the height of the peerState changes. + // We'll call cbFunc then. + return + } else if ps.height == height { + peerRound := calcRound(ps.startTime) + if peerRound < round { + // Set a timer to call the cbFunc when the time comes. + go func() { + roundStart := calcRoundStartTime(round, ps.startTime) + time.Sleep(roundStart.Sub(time.Now())) + // If peer height is still good + ps.mtx.Lock() + peerHeight := ps.height + ps.mtx.Unlock() + if peerHeight == height { + f() + } + }() + } else if peerRound == round { + go f() + } else { + return + } + } else { + return + } +} + //----------------------------------------------------------------------------- // Messages diff --git a/consensus/document.go b/consensus/document.go new file mode 100644 index 00000000..bf9a362d --- /dev/null +++ b/consensus/document.go @@ -0,0 +1,41 @@ +package consensus + +import ( + "fmt" + . "github.com/tendermint/tendermint/config" +) + +func GenVoteDocument(voteType byte, height uint32, round uint16, proposalHash []byte) string { + stepName := "" + switch voteType { + case VoteTypeBare: + stepName = "bare" + case VoteTypePrecommit: + stepName = "precommit" + case VoteTypeCommit: + stepName = "commit" + default: + panic("Unknown vote type") + } + return fmt.Sprintf( + `-----BEGIN TENDERMINT DOCUMENT----- +URI: %v://consensus/%v/%v/%v +ProposalHash: %X +-----END TENDERMINT DOCUMENHT-----`, + Config.Network, height, round, stepName, + proposalHash, + ) +} + +func GenBlockPartDocument(height uint32, round uint16, index uint16, total uint16, blockPartHash []byte) string { + return fmt.Sprintf( + `-----BEGIN TENDERMINT DOCUMENT----- +URI: %v://blockpart/%v/%v/%v +Total: %v +BlockPartHash: %X +-----END TENDERMINT DOCUMENHT-----`, + Config.Network, height, round, index, + total, + blockPartHash, + ) +} diff --git a/consensus/priv_validator.go b/consensus/priv_validator.go index 89cc90fb..6e7c767d 100644 --- a/consensus/priv_validator.go +++ b/consensus/priv_validator.go @@ -1,20 +1,40 @@ package consensus import ( + . "github.com/tendermint/tendermint/blocks" db_ "github.com/tendermint/tendermint/db" . "github.com/tendermint/tendermint/state" ) //----------------------------------------------------------------------------- -// TODO: Ensure that double signing never happens via an external persistent check. type PrivValidator struct { PrivAccount db *db_.LevelDB } +// Returns new signed blockParts. +// If signatures already exist in proposal BlockParts, +// e.g. a locked proposal from some prior round, +// those signatures are overwritten. +// Double signing (signing multiple proposals for the same height&round) results in an error. +func (pv *PrivValidator) SignProposal(round uint16, proposal *BlockPartSet) (err error) { + //TODO: prevent double signing. + blockParts := proposal.BlockParts() + for i, part := range blockParts { + partHash := part.Hash() + doc := GenBlockPartDocument( + proposal.Height(), round, uint16(i), uint16(len(blockParts)), partHash) + part.Signature = pv.Sign([]byte(doc)) + } + return nil +} + // Modifies the vote object in memory. // Double signing results in an error. func (pv *PrivValidator) SignVote(vote *Vote) error { + //TODO: prevent double signing. + doc := GenVoteDocument(vote.Type, vote.Height, vote.Round, vote.Hash) + vote.Signature = pv.Sign([]byte(doc)) return nil } diff --git a/consensus/state.go b/consensus/state.go index e7cd5088..2b89ad6d 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -4,6 +4,7 @@ import ( "sync" "time" + . "github.com/tendermint/tendermint/blocks" . "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/state" ) @@ -131,7 +132,7 @@ type RoundState struct { Expires time.Time // Time after which this round is expired. Proposer *Validator // The proposer to propose a block for this round. Validators *ValidatorSet // All validators with modified accumPower for this round. - BlockPartSet *BlockPartSet // All block parts received for this round. + Proposal *BlockPartSet // All block parts received for this round. RoundBareVotes *VoteSet // All votes received for this round. RoundPrecommits *VoteSet // All precommits received for this round. Commits *VoteSet // A shared object for all commit votes of this height. @@ -144,7 +145,7 @@ func NewRoundState(height uint32, round uint16, startTime time.Time, validators *ValidatorSet, commits *VoteSet) *RoundState { proposer := validators.GetProposer() - blockPartSet := NewBlockPartSet(height, round, &(proposer.Account)) + blockPartSet := NewBlockPartSet(height, nil) roundBareVotes := NewVoteSet(height, round, VoteTypeBare, validators) roundPrecommits := NewVoteSet(height, round, VoteTypePrecommit, validators) @@ -155,7 +156,7 @@ func NewRoundState(height uint32, round uint16, startTime time.Time, Expires: calcRoundStartTime(round+1, startTime), Proposer: proposer, Validators: validators, - BlockPartSet: blockPartSet, + Proposal: blockPartSet, RoundBareVotes: roundBareVotes, RoundPrecommits: roundPrecommits, Commits: commits, diff --git a/consensus/vote.go b/consensus/vote.go index f3206045..b5dfe2c7 100644 --- a/consensus/vote.go +++ b/consensus/vote.go @@ -3,13 +3,11 @@ package consensus import ( "bytes" "errors" - "fmt" "io" "sync" . "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/blocks" - "github.com/tendermint/tendermint/config" . "github.com/tendermint/tendermint/state" ) @@ -30,7 +28,7 @@ var ( // Represents a bare, precommit, or commit vote for proposals. type Vote struct { Height uint32 - Round uint16 + Round uint16 // zero if commit vote. Type byte Hash []byte // empty if vote is nil. Signature @@ -55,45 +53,8 @@ func (v *Vote) WriteTo(w io.Writer) (n int64, err error) { return } -// This is the byteslice that validators should sign to signify a vote -// for the given proposal at given height & round. -// If hash is nil, the vote is a nil vote. -func (v *Vote) GetDocument() []byte { - switch v.Type { - case VoteTypeBare: - if len(v.Hash) == 0 { - doc := fmt.Sprintf("%v://consensus/%v/%v/b\nnil", - config.Config.Network, v.Height, v.Round) - return []byte(doc) - } else { - doc := fmt.Sprintf("%v://consensus/%v/%v/b\n%v", - config.Config.Network, v.Height, v.Round, - CalcBlockURI(v.Height, v.Hash)) - return []byte(doc) - } - case VoteTypePrecommit: - if len(v.Hash) == 0 { - doc := fmt.Sprintf("%v://consensus/%v/%v/p\nnil", - config.Config.Network, v.Height, v.Round) - return []byte(doc) - } else { - doc := fmt.Sprintf("%v://consensus/%v/%v/p\n%v", - config.Config.Network, v.Height, v.Round, - CalcBlockURI(v.Height, v.Hash)) - return []byte(doc) - } - case VoteTypeCommit: - if len(v.Hash) == 0 { - panic("Commit hash cannot be nil") - } else { - doc := fmt.Sprintf("%v://consensus/%v/c\n%v", - config.Config.Network, v.Height, // omit round info - CalcBlockURI(v.Height, v.Hash)) - return []byte(doc) - } - default: - panic("Unknown vote type") - } +func (v *Vote) GetDocument() string { + return GenVoteDocument(v.Type, v.Height, v.Round, v.Hash) } //----------------------------------------------------------------------------- @@ -114,6 +75,9 @@ type VoteSet struct { // Constructs a new VoteSet struct used to accumulate votes for each round. func NewVoteSet(height uint32, round uint16, type_ byte, validators *ValidatorSet) *VoteSet { + if type_ == VoteTypeCommit && round != 0 { + panic("Expected round 0 for commit vote set") + } totalVotingPower := uint64(0) for _, val := range validators.Map() { totalVotingPower += val.VotingPower @@ -137,7 +101,9 @@ func (vs *VoteSet) AddVote(vote *Vote) (bool, error) { defer vs.mtx.Unlock() // Make sure the phase matches. - if vote.Height != vs.height || vote.Round != vs.round || vote.Type != vs.type_ { + if vote.Height != vs.height || + (vote.Type != VoteTypeCommit && vote.Round != vs.round) || + vote.Type != vs.type_ { return false, ErrVoteUnexpectedPhase } @@ -147,7 +113,7 @@ func (vs *VoteSet) AddVote(vote *Vote) (bool, error) { return false, ErrVoteInvalidAccount } // Check signature. - if !val.Verify(vote.GetDocument(), vote.Signature.Bytes) { + if !val.Verify([]byte(vote.GetDocument()), vote.Signature) { // Bad signature. return false, ErrVoteInvalidSignature } diff --git a/merkle/util.go b/merkle/util.go index 8b538173..262dd9d2 100644 --- a/merkle/util.go +++ b/merkle/util.go @@ -8,12 +8,31 @@ import ( ) func HashFromByteSlices(items [][]byte) []byte { - panic("Implement me") - return nil + switch len(items) { + case 0: + panic("Cannot compute hash of empty slice") + case 1: + return items[0] + default: + var n int64 + var err error + var hasher = sha256.New() + hash := HashFromByteSlices(items[0 : len(items)/2]) + WriteByteSlice(hasher, hash, &n, &err) + if err != nil { + panic(err) + } + hash = HashFromByteSlices(items[len(items)/2:]) + WriteByteSlice(hasher, hash, &n, &err) + if err != nil { + panic(err) + } + return hasher.Sum(nil) + } } /* -Compute a deterministic merkle hash from a list of byteslices. +Compute a deterministic merkle hash from a list of Binary objects. */ func HashFromBinarySlice(items []Binary) []byte { switch len(items) { diff --git a/state/account.go b/state/account.go index d90450ca..3270dd48 100644 --- a/state/account.go +++ b/state/account.go @@ -19,7 +19,10 @@ func ReadAccount(r io.Reader, n *int64, err *error) *Account { } } -func (self *Account) Verify(msg []byte, sig []byte) bool { +func (self *Account) Verify(msg []byte, sig Signature) bool { + if sig.SignerId != self.Id { + return false + } return false } @@ -33,21 +36,3 @@ type PrivAccount struct { func (self *PrivAccount) Sign(msg []byte) Signature { return Signature{} } - -/* -// Signs the URI, which includes all data and metadata. -// XXX implement or change -func (bp *BlockPart) Sign(acc *PrivAccount) { - // TODO: populate Signature -} - -// XXX maybe change. -func (bp *BlockPart) ValidateWithSigner(signer *Account) error { - // TODO: Sanity check height, index, total, bytes, etc. - if !signer.Verify([]byte(bp.URI()), bp.Signature.Bytes) { - return ErrInvalidBlockPartSignature - } - return nil -} - -*/ diff --git a/state/store.go b/state/store.go deleted file mode 100644 index 8133fc89..00000000 --- a/state/store.go +++ /dev/null @@ -1,19 +0,0 @@ -package state - -import ( - . "github.com/tendermint/tendermint/blocks" -) - -// XXX ugh, bad name. -type StateStore struct { -} - -func (ss *StateStore) StageBlock(block *Block) error { - // XXX implement staging. - return nil -} - -func (ss *StateStore) CommitBlock(block *Block) error { - // XXX implement staging. - return nil -}