mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-24 02:01:43 +00:00
share block parts when we're the proposer
This commit is contained in:
@ -2,13 +2,11 @@ package blocks
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
. "github.com/tendermint/tendermint/binary"
|
||||
. "github.com/tendermint/tendermint/common"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/merkle"
|
||||
)
|
||||
|
||||
@ -16,14 +14,6 @@ const (
|
||||
defaultBlockPartSizeBytes = 4096
|
||||
)
|
||||
|
||||
func CalcBlockURI(height uint32, hash []byte) string {
|
||||
return fmt.Sprintf("%v://block/%v#%X",
|
||||
config.Config.Network,
|
||||
height,
|
||||
hash,
|
||||
)
|
||||
}
|
||||
|
||||
type Block struct {
|
||||
Header
|
||||
Validation
|
||||
@ -49,15 +39,10 @@ func (b *Block) WriteTo(w io.Writer) (n int64, err error) {
|
||||
}
|
||||
|
||||
func (b *Block) ValidateBasic() error {
|
||||
// Basic validation that doesn't involve context.
|
||||
// XXX
|
||||
// TODO Basic validation that doesn't involve context.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *Block) URI() string {
|
||||
return CalcBlockURI(b.Height, b.Hash())
|
||||
}
|
||||
|
||||
func (b *Block) Hash() []byte {
|
||||
if b.hash != nil {
|
||||
return b.hash
|
||||
@ -73,7 +58,8 @@ func (b *Block) Hash() []byte {
|
||||
}
|
||||
|
||||
// The returns parts must be signed afterwards.
|
||||
func (b *Block) ToBlockParts() (parts []*BlockPart) {
|
||||
func (b *Block) ToBlockPartSet() *BlockPartSet {
|
||||
var parts []*BlockPart
|
||||
blockBytes := BinaryBytes(b)
|
||||
total := (len(blockBytes) + defaultBlockPartSizeBytes - 1) / defaultBlockPartSizeBytes
|
||||
for i := 0; i < total; i++ {
|
||||
@ -90,7 +76,7 @@ func (b *Block) ToBlockParts() (parts []*BlockPart) {
|
||||
}
|
||||
parts = append(parts, part)
|
||||
}
|
||||
return parts
|
||||
return NewBlockPartSet(b.Height, parts)
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
@ -133,18 +119,8 @@ func (bp *BlockPart) WriteTo(w io.Writer) (n int64, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (bp *BlockPart) URI() string {
|
||||
return fmt.Sprintf("%v://block/%v/%v[%v/%v]#%X\n",
|
||||
config.Config.Network,
|
||||
bp.Height,
|
||||
bp.Round,
|
||||
bp.Index,
|
||||
bp.Total,
|
||||
bp.BlockPartHash(),
|
||||
)
|
||||
}
|
||||
|
||||
func (bp *BlockPart) BlockPartHash() []byte {
|
||||
// Hash returns the hash of the block part data bytes.
|
||||
func (bp *BlockPart) Hash() []byte {
|
||||
if bp.hash != nil {
|
||||
return bp.hash
|
||||
} else {
|
||||
|
@ -1,50 +1,44 @@
|
||||
package consensus
|
||||
package blocks
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
. "github.com/tendermint/tendermint/blocks"
|
||||
. "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/merkle"
|
||||
)
|
||||
|
||||
// Helper for keeping track of block parts.
|
||||
// A collection of block parts.
|
||||
// Doesn't do any validation.
|
||||
type BlockPartSet struct {
|
||||
mtx sync.Mutex
|
||||
signer *Account
|
||||
height uint32
|
||||
round uint16 // Not used
|
||||
total uint16
|
||||
numParts uint16
|
||||
total uint16 // total number of parts
|
||||
numParts uint16 // number of parts in this set
|
||||
parts []*BlockPart
|
||||
|
||||
_block *Block // cache
|
||||
}
|
||||
|
||||
var (
|
||||
ErrInvalidBlockPartSignature = errors.New("Invalid block part signature") // Peer gave us a fake part
|
||||
ErrInvalidBlockPartConflict = errors.New("Invalid block part conflict") // Signer signed conflicting parts
|
||||
ErrInvalidBlockPartConflict = errors.New("Invalid block part conflict") // Signer signed conflicting parts
|
||||
)
|
||||
|
||||
// Signer may be nil if signer is unknown beforehand.
|
||||
func NewBlockPartSet(height uint32, round uint16, signer *Account) *BlockPartSet {
|
||||
return &BlockPartSet{
|
||||
signer: signer,
|
||||
height: height,
|
||||
round: round,
|
||||
// parts may be nil if the parts aren't in hand.
|
||||
func NewBlockPartSet(height uint32, parts []*BlockPart) *BlockPartSet {
|
||||
bps := &BlockPartSet{
|
||||
height: height,
|
||||
parts: parts,
|
||||
numParts: uint16(len(parts)),
|
||||
}
|
||||
if len(parts) > 0 {
|
||||
bps.total = parts[0].Total
|
||||
}
|
||||
return bps
|
||||
}
|
||||
|
||||
// In the case where the signer wasn't known prior to NewBlockPartSet(),
|
||||
// user should call SetSigner() prior to AddBlockPart().
|
||||
func (bps *BlockPartSet) SetSigner(signer *Account) {
|
||||
bps.mtx.Lock()
|
||||
defer bps.mtx.Unlock()
|
||||
if bps.signer != nil {
|
||||
panic("BlockPartSet signer already set.")
|
||||
}
|
||||
bps.signer = signer
|
||||
func (bps *BlockPartSet) Height() uint32 {
|
||||
return bps.height
|
||||
}
|
||||
|
||||
func (bps *BlockPartSet) BlockParts() []*BlockPart {
|
||||
@ -69,19 +63,12 @@ func (bps *BlockPartSet) BitArray() []byte {
|
||||
}
|
||||
|
||||
// If the part isn't valid, returns an error.
|
||||
// err can be ErrInvalidBlockPart[Conflict|Signature]
|
||||
// err can be ErrInvalidBlockPartConflict
|
||||
// NOTE: Caller must check the signature before adding.
|
||||
func (bps *BlockPartSet) AddBlockPart(part *BlockPart) (added bool, err error) {
|
||||
bps.mtx.Lock()
|
||||
defer bps.mtx.Unlock()
|
||||
|
||||
// If part is invalid, return an error.
|
||||
/* XXX
|
||||
err = part.ValidateWithSigner(bps.signer)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
*/
|
||||
|
||||
if bps.parts == nil {
|
||||
// First received part for this round.
|
||||
bps.parts = make([]*BlockPart, part.Total)
|
||||
@ -128,14 +115,44 @@ func (bps *BlockPartSet) Block() *Block {
|
||||
bps.mtx.Lock()
|
||||
defer bps.mtx.Unlock()
|
||||
if bps._block == nil {
|
||||
blockBytes := []byte{}
|
||||
for _, part := range bps.parts {
|
||||
blockBytes = append(blockBytes, part.Bytes...)
|
||||
block, err := BlockPartsToBlock(bps.parts)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var n int64
|
||||
var err error
|
||||
block := ReadBlock(bytes.NewReader(blockBytes), &n, &err)
|
||||
bps._block = block
|
||||
}
|
||||
return bps._block
|
||||
}
|
||||
|
||||
func (bps *BlockPartSet) Hash() []byte {
|
||||
if !bps.IsComplete() {
|
||||
panic("Cannot get hash of an incomplete BlockPartSet")
|
||||
}
|
||||
hashes := [][]byte{}
|
||||
for _, part := range bps.parts {
|
||||
partHash := part.Hash()
|
||||
hashes = append(hashes, partHash)
|
||||
}
|
||||
return merkle.HashFromByteSlices(hashes)
|
||||
}
|
||||
|
||||
// The proposal hash includes both the block hash
|
||||
// as well as the BlockPartSet merkle hash.
|
||||
func (bps *BlockPartSet) ProposalHash() []byte {
|
||||
bpsHash := bps.Hash()
|
||||
blockHash := bps.Block().Hash()
|
||||
return merkle.HashFromByteSlices([][]byte{bpsHash, blockHash})
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
func BlockPartsToBlock(parts []*BlockPart) (*Block, error) {
|
||||
blockBytes := []byte{}
|
||||
for _, part := range parts {
|
||||
blockBytes = append(blockBytes, part.Bytes...)
|
||||
}
|
||||
var n int64
|
||||
var err error
|
||||
block := ReadBlock(bytes.NewReader(blockBytes), &n, &err)
|
||||
return block, err
|
||||
}
|
@ -90,8 +90,19 @@ func (bs *BlockStore) LoadBlock(height uint32) *Block {
|
||||
if part0 == nil {
|
||||
return nil
|
||||
}
|
||||
// XXX implement
|
||||
panic("TODO: Not implemented")
|
||||
parts := []*BlockPart{part0}
|
||||
for i := uint16(1); i < part0.Total; i++ {
|
||||
part := bs.LoadBlockPart(height, i)
|
||||
if part == nil {
|
||||
Panicf("Failed to retrieve block part %v at height %v", i, height)
|
||||
}
|
||||
parts = append(parts, part)
|
||||
}
|
||||
block, err := BlockPartsToBlock(parts)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return block
|
||||
}
|
||||
|
||||
// NOTE: Assumes that parts as well as the block are valid. See StageBlockParts().
|
||||
|
@ -33,16 +33,6 @@ const (
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
// convenience
|
||||
func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, roundDuration time.Duration, roundElapsed time.Duration, elapsedRatio float64) {
|
||||
round = calcRound(startTime)
|
||||
roundStartTime = calcRoundStartTime(round, startTime)
|
||||
roundDuration = calcRoundDuration(round)
|
||||
roundElapsed = time.Now().Sub(roundStartTime)
|
||||
elapsedRatio = float64(roundElapsed) / float64(roundDuration)
|
||||
return
|
||||
}
|
||||
|
||||
// total duration of given round
|
||||
func calcRoundDuration(round uint16) time.Duration {
|
||||
return roundDuration0 + roundDurationDelta*time.Duration(round)
|
||||
@ -80,6 +70,17 @@ func calcRound(startTime time.Time) uint16 {
|
||||
return uint16(R)
|
||||
}
|
||||
|
||||
// convenience
|
||||
func calcRoundInfo(startTime time.Time) (round uint16, roundStartTime time.Time, roundDuration time.Duration,
|
||||
roundElapsed time.Duration, elapsedRatio float64) {
|
||||
round = calcRound(startTime)
|
||||
roundStartTime = calcRoundStartTime(round, startTime)
|
||||
roundDuration = calcRoundDuration(round)
|
||||
roundElapsed = time.Now().Sub(roundStartTime)
|
||||
elapsedRatio = float64(roundElapsed) / float64(roundDuration)
|
||||
return
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
type ConsensusManager struct {
|
||||
@ -127,6 +128,12 @@ func (cm *ConsensusManager) SetPrivValidator(priv *PrivValidator) {
|
||||
cm.privValidator = priv
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) PrivValidator() *PrivValidator {
|
||||
cm.mtx.Lock()
|
||||
defer cm.mtx.Unlock()
|
||||
return cm.privValidator
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) Start() {
|
||||
if atomic.CompareAndSwapUint32(&cm.started, 0, 1) {
|
||||
log.Info("Starting ConsensusManager")
|
||||
@ -164,12 +171,16 @@ func (cm *ConsensusManager) switchEventsRoutine() {
|
||||
// By sending KnownBlockPartsMessage,
|
||||
// we send our height/round + startTime, and known block parts,
|
||||
// which is sufficient for the peer to begin interacting with us.
|
||||
event.Peer.TrySend(ProposalCh, cm.makeKnownBlockPartsMessage())
|
||||
event.Peer.TrySend(ProposalCh, cm.makeKnownBlockPartsMessage(cm.cs.RoundState()))
|
||||
case p2p.SwitchEventDonePeer:
|
||||
event := swEvent.(p2p.SwitchEventDonePeer)
|
||||
// Delete peerState for event.Peer
|
||||
cm.mtx.Lock()
|
||||
delete(cm.peerStates, event.Peer.Key)
|
||||
peerState := cm.peerStates[event.Peer.Key]
|
||||
if peerState != nil {
|
||||
peerState.Disconnect()
|
||||
delete(cm.peerStates, event.Peer.Key)
|
||||
}
|
||||
cm.mtx.Unlock()
|
||||
default:
|
||||
log.Warning("Unhandled switch event type")
|
||||
@ -178,23 +189,19 @@ func (cm *ConsensusManager) switchEventsRoutine() {
|
||||
}
|
||||
|
||||
// Like, how large is it and how often can we send it?
|
||||
func (cm *ConsensusManager) makeKnownBlockPartsMessage() *KnownBlockPartsMessage {
|
||||
rs := cm.cs.RoundState()
|
||||
func (cm *ConsensusManager) makeKnownBlockPartsMessage(rs *RoundState) *KnownBlockPartsMessage {
|
||||
return &KnownBlockPartsMessage{
|
||||
Height: rs.Height,
|
||||
SecondsSinceStartTime: uint32(time.Now().Sub(rs.StartTime).Seconds()),
|
||||
BlockPartsBitArray: rs.BlockPartSet.BitArray(),
|
||||
BlockPartsBitArray: rs.Proposal.BitArray(),
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: may return nil, but (nil).Wants*() returns false.
|
||||
func (cm *ConsensusManager) getPeerState(peer *p2p.Peer) *PeerState {
|
||||
cm.mtx.Lock()
|
||||
defer cm.mtx.Unlock()
|
||||
peerState := cm.peerStates[peer.Key]
|
||||
if peerState == nil {
|
||||
log.Warning("Wanted peerState for %v but none exists", peer)
|
||||
}
|
||||
return peerState
|
||||
return cm.peerStates[peer.Key]
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) gossipProposalRoutine() {
|
||||
@ -221,27 +228,33 @@ OUTER_LOOP:
|
||||
|
||||
// TODO Continue if we've already voted, then no point processing the part.
|
||||
|
||||
// Check that the signature is valid and from proposer.
|
||||
if rs.Proposer.Verify(msg.BlockPart.Hash(), msg.BlockPart.Signature) {
|
||||
// TODO handle bad peer.
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
|
||||
// If we are the proposer, then don't do anything else.
|
||||
// We're already sending peers our proposal on another routine.
|
||||
privValidator := cm.PrivValidator()
|
||||
if privValidator != nil && rs.Proposer.Account.Id == privValidator.Id {
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
|
||||
// Add and process the block part
|
||||
added, err := rs.BlockPartSet.AddBlockPart(msg.BlockPart)
|
||||
added, err := rs.Proposal.AddBlockPart(msg.BlockPart)
|
||||
if err == ErrInvalidBlockPartConflict {
|
||||
// TODO: Bad validator
|
||||
} else if err == ErrInvalidBlockPartSignature {
|
||||
// TODO: Bad peer
|
||||
} else if err != nil {
|
||||
Panicf("Unexpected blockPartsSet error %v", err)
|
||||
}
|
||||
if added {
|
||||
// If peer wants this part, send peer the part
|
||||
// and our new blockParts state.
|
||||
kbpMsg := cm.makeKnownBlockPartsMessage()
|
||||
kbpMsg := cm.makeKnownBlockPartsMessage(rs)
|
||||
partMsg := &BlockPartMessage{BlockPart: msg.BlockPart}
|
||||
PEERS_LOOP:
|
||||
for _, peer := range cm.sw.Peers().List() {
|
||||
peerState := cm.getPeerState(peer)
|
||||
if peerState == nil {
|
||||
// Peer disconnected before we were able to process.
|
||||
continue PEERS_LOOP
|
||||
}
|
||||
if peerState.WantsBlockPart(msg.BlockPart) {
|
||||
peer.TrySend(KnownPartsCh, kbpMsg)
|
||||
peer.TrySend(ProposalCh, partMsg)
|
||||
@ -282,7 +295,7 @@ OUTER_LOOP:
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
peerState := cm.getPeerState(inMsg.MConn.Peer)
|
||||
if peerState == nil {
|
||||
if !peerState.IsConnected() {
|
||||
// Peer disconnected before we were able to process.
|
||||
continue OUTER_LOOP
|
||||
}
|
||||
@ -295,8 +308,9 @@ OUTER_LOOP:
|
||||
// Signs a vote document and broadcasts it.
|
||||
// hash can be nil to vote "nil"
|
||||
func (cm *ConsensusManager) signAndVote(vote *Vote) error {
|
||||
if cm.privValidator != nil {
|
||||
err := cm.privValidator.SignVote(vote)
|
||||
privValidator := cm.PrivValidator()
|
||||
if privValidator != nil {
|
||||
err := privValidator.SignVote(vote)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -345,8 +359,10 @@ func (cm *ConsensusManager) stageProposal(proposal *BlockPartSet) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) constructProposal(rs *RoundState) (*Block, error) {
|
||||
// XXX implement
|
||||
// Constructs an unsigned proposal
|
||||
func (cm *ConsensusManager) constructProposal(rs *RoundState) (*BlockPartSet, error) {
|
||||
// XXX implement, first implement mempool
|
||||
// proposal := block.ToBlockPartSet()
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@ -367,7 +383,7 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error {
|
||||
return err
|
||||
}
|
||||
// Stage proposal
|
||||
err := cm.stageProposal(rs.BlockPartSet)
|
||||
err := cm.stageProposal(rs.Proposal)
|
||||
if err != nil {
|
||||
// Vote for nil, whatever the error.
|
||||
err := cm.signAndVote(&Vote{
|
||||
@ -383,7 +399,7 @@ func (cm *ConsensusManager) voteProposal(rs *RoundState) error {
|
||||
Height: rs.Height,
|
||||
Round: rs.Round,
|
||||
Type: VoteTypeBare,
|
||||
Hash: rs.BlockPartSet.Block().Hash(),
|
||||
Hash: rs.Proposal.Block().Hash(),
|
||||
})
|
||||
return err
|
||||
}
|
||||
@ -402,13 +418,13 @@ func (cm *ConsensusManager) precommitProposal(rs *RoundState) error {
|
||||
|
||||
// If proposal is invalid or unknown, do nothing.
|
||||
// See note on ZombieValidators to see why.
|
||||
if cm.stageProposal(rs.BlockPartSet) != nil {
|
||||
if cm.stageProposal(rs.Proposal) != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Lock this proposal.
|
||||
// NOTE: we're unlocking any prior locks.
|
||||
cm.cs.LockProposal(rs.BlockPartSet)
|
||||
cm.cs.LockProposal(rs.Proposal)
|
||||
|
||||
// Send precommit vote.
|
||||
err := cm.signAndVote(&Vote{
|
||||
@ -428,19 +444,19 @@ func (cm *ConsensusManager) precommitProposal(rs *RoundState) error {
|
||||
// Commit or unlock.
|
||||
// Call after RoundStepPrecommit, after round has expired.
|
||||
func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error {
|
||||
// If there exists a 2/3 majority of precommits.
|
||||
// Validate the block and commit.
|
||||
if hash, ok := rs.RoundPrecommits.TwoThirdsMajority(); ok {
|
||||
// If there exists a 2/3 majority of precommits.
|
||||
// Validate the block and commit.
|
||||
|
||||
// If the proposal is invalid or we don't have it,
|
||||
// do not commit.
|
||||
// TODO If we were just late to receive the block, when
|
||||
// do we actually get it? Document it.
|
||||
if cm.stageProposal(rs.BlockPartSet) != nil {
|
||||
if cm.stageProposal(rs.Proposal) != nil {
|
||||
return nil
|
||||
}
|
||||
// TODO: Remove?
|
||||
cm.cs.LockProposal(rs.BlockPartSet)
|
||||
cm.cs.LockProposal(rs.Proposal)
|
||||
// Vote commit.
|
||||
err := cm.signAndVote(&Vote{
|
||||
Height: rs.Height,
|
||||
@ -457,7 +473,7 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error {
|
||||
// time differences between nodes, so nodes end up drifting
|
||||
// in time.
|
||||
commitTime := time.Now()
|
||||
cm.commitProposal(rs.BlockPartSet, commitTime)
|
||||
cm.commitProposal(rs.Proposal, commitTime)
|
||||
return nil
|
||||
} else {
|
||||
// Otherwise, if a 1/3 majority if a block that isn't our locked one exists, unlock.
|
||||
@ -478,16 +494,16 @@ func (cm *ConsensusManager) commitOrUnlockProposal(rs *RoundState) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) commitProposal(blockPartSet *BlockPartSet, commitTime time.Time) error {
|
||||
func (cm *ConsensusManager) commitProposal(proposal *BlockPartSet, commitTime time.Time) error {
|
||||
cm.mtx.Lock()
|
||||
defer cm.mtx.Unlock()
|
||||
|
||||
if cm.stagedProposal != blockPartSet {
|
||||
if cm.stagedProposal != proposal {
|
||||
panic("Unexpected stagedProposal.") // Shouldn't happen.
|
||||
}
|
||||
|
||||
// Save to blockStore
|
||||
block, blockParts := blockPartSet.Block(), blockPartSet.BlockParts()
|
||||
block, blockParts := proposal.Block(), proposal.BlockParts()
|
||||
err := cm.blockStore.SaveBlockParts(block.Height, blockParts)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -502,6 +518,56 @@ func (cm *ConsensusManager) commitProposal(blockPartSet *BlockPartSet, commitTim
|
||||
return nil
|
||||
}
|
||||
|
||||
// Given a RoundState where we are the proposer,
|
||||
// broadcast rs.proposal to all the peers.
|
||||
func (cm *ConsensusManager) shareProposal(rs *RoundState) {
|
||||
privValidator := cm.PrivValidator()
|
||||
proposal := rs.Proposal
|
||||
if privValidator == nil || proposal == nil {
|
||||
return
|
||||
}
|
||||
privValidator.SignProposal(rs.Round, proposal)
|
||||
blockParts := proposal.BlockParts()
|
||||
peers := cm.sw.Peers().List()
|
||||
if len(peers) == 0 {
|
||||
log.Warning("Could not propose: no peers")
|
||||
return
|
||||
}
|
||||
numBlockParts := uint16(len(blockParts))
|
||||
kbpMsg := cm.makeKnownBlockPartsMessage(rs)
|
||||
for i, peer := range peers {
|
||||
peerState := cm.getPeerState(peer)
|
||||
if !peerState.IsConnected() {
|
||||
continue // Peer was disconnected.
|
||||
}
|
||||
startIndex := uint16((i * len(blockParts)) / len(peers))
|
||||
// Create a function that when called,
|
||||
// starts sending block parts to peer.
|
||||
cb := func(peer *p2p.Peer, startIndex uint16) func() {
|
||||
return func() {
|
||||
// TODO: if the clocks are off a bit,
|
||||
// peer may receive this before the round flips.
|
||||
peer.Send(KnownPartsCh, kbpMsg)
|
||||
for i := uint16(0); i < numBlockParts; i++ {
|
||||
part := blockParts[(startIndex+i)%numBlockParts]
|
||||
// Ensure round hasn't expired on our end.
|
||||
currentRS := cm.cs.RoundState()
|
||||
if currentRS != rs {
|
||||
return
|
||||
}
|
||||
// If peer wants the block:
|
||||
if peerState.WantsBlockPart(part) {
|
||||
partMsg := &BlockPartMessage{BlockPart: part}
|
||||
peer.Send(ProposalCh, partMsg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}(peer, startIndex)
|
||||
// Call immediately or schedule cb for when peer is ready.
|
||||
peerState.SetRoundCallback(rs.Height, rs.Round, cb)
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *ConsensusManager) gossipVoteRoutine() {
|
||||
OUTER_LOOP:
|
||||
for {
|
||||
@ -544,13 +610,8 @@ OUTER_LOOP:
|
||||
}
|
||||
|
||||
// Gossip vote.
|
||||
PEERS_LOOP:
|
||||
for _, peer := range cm.sw.Peers().List() {
|
||||
peerState := cm.getPeerState(peer)
|
||||
if peerState == nil {
|
||||
// Peer disconnected before we were able to process.
|
||||
continue PEERS_LOOP
|
||||
}
|
||||
if peerState.WantsVote(vote) {
|
||||
msg := p2p.TypedMessage{msgTypeVote, vote}
|
||||
peer.TrySend(VoteCh, msg)
|
||||
@ -630,17 +691,25 @@ func (cm *ConsensusManager) proposeAndVoteRoutine() {
|
||||
|
||||
if step == RoundStepProposal && rs.Step() == RoundStepStart {
|
||||
// Propose a block if I am the proposer.
|
||||
if cm.privValidator != nil && rs.Proposer.Account.Id == cm.privValidator.Id {
|
||||
block, err := cm.constructProposal(rs)
|
||||
if err != nil {
|
||||
log.Error("Error attempting to construct a proposal: %v", err)
|
||||
privValidator := cm.PrivValidator()
|
||||
if privValidator != nil && rs.Proposer.Account.Id == privValidator.Id {
|
||||
// If we're already locked on a proposal, use that.
|
||||
proposal := cm.cs.LockedProposal()
|
||||
if proposal != nil {
|
||||
// Otherwise, construct a new proposal.
|
||||
var err error
|
||||
proposal, err = cm.constructProposal(rs)
|
||||
if err != nil {
|
||||
log.Error("Error attempting to construct a proposal: %v", err)
|
||||
return // Pretend like we weren't the proposer. Shrug.
|
||||
}
|
||||
}
|
||||
// XXX propose the block.
|
||||
log.Error("XXX use ", block)
|
||||
// XXX divide block into parts
|
||||
// XXX communicate parts.
|
||||
// XXX put this in another function.
|
||||
panic("Implement block proposal!")
|
||||
// Set proposal for roundState, so we vote correctly subsequently.
|
||||
rs.Proposal = proposal
|
||||
// Share the parts.
|
||||
// We send all parts to all of our peers, but everyone receives parts
|
||||
// starting at a different index, wrapping around back to 0.
|
||||
cm.shareProposal(rs)
|
||||
}
|
||||
} else if step == RoundStepBareVotes && rs.Step() <= RoundStepProposal {
|
||||
err := cm.voteProposal(rs)
|
||||
@ -680,28 +749,57 @@ var (
|
||||
|
||||
type PeerState struct {
|
||||
mtx sync.Mutex
|
||||
connected bool
|
||||
peer *p2p.Peer
|
||||
height uint32
|
||||
startTime time.Time // Derived from offset seconds.
|
||||
blockPartsBitArray []byte
|
||||
votesWanted map[uint64]float32
|
||||
cbHeight uint32
|
||||
cbRound uint16
|
||||
cbFunc func()
|
||||
}
|
||||
|
||||
func NewPeerState(peer *p2p.Peer) *PeerState {
|
||||
return &PeerState{
|
||||
connected: true,
|
||||
peer: peer,
|
||||
height: 0,
|
||||
votesWanted: make(map[uint64]float32),
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *PeerState) WantsBlockPart(part *BlockPart) bool {
|
||||
func (ps *PeerState) IsConnected() bool {
|
||||
if ps == nil {
|
||||
return false
|
||||
}
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
return ps.connected
|
||||
}
|
||||
|
||||
func (ps *PeerState) Disconnect() {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
ps.connected = false
|
||||
}
|
||||
|
||||
func (ps *PeerState) WantsBlockPart(part *BlockPart) bool {
|
||||
if ps == nil {
|
||||
return false
|
||||
}
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
if !ps.connected {
|
||||
return false
|
||||
}
|
||||
// Only wants the part if peer's current height and round matches.
|
||||
if ps.height == part.Height {
|
||||
round, _, _, _, elapsedRatio := calcRoundInfo(ps.startTime)
|
||||
if round == part.Round && elapsedRatio < roundDeadlineBare {
|
||||
round := calcRound(ps.startTime)
|
||||
// NOTE: validators want to receive remaining block parts
|
||||
// even after it had voted bare or precommit.
|
||||
// Ergo, we do not check for which step the peer is in.
|
||||
if round == part.Round {
|
||||
// Only wants the part if it doesn't already have it.
|
||||
if ps.blockPartsBitArray[part.Index/8]&byte(1<<(part.Index%8)) == 0 {
|
||||
return true
|
||||
@ -712,8 +810,14 @@ func (ps *PeerState) WantsBlockPart(part *BlockPart) bool {
|
||||
}
|
||||
|
||||
func (ps *PeerState) WantsVote(vote *Vote) bool {
|
||||
if ps == nil {
|
||||
return false
|
||||
}
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
if !ps.connected {
|
||||
return false
|
||||
}
|
||||
// Only wants the vote if votesWanted says so
|
||||
if ps.votesWanted[vote.SignerId] <= 0 {
|
||||
// TODO: sometimes, send unsolicited votes to see if peer wants it.
|
||||
@ -773,6 +877,12 @@ func (ps *PeerState) ApplyKnownBlockPartsMessage(msg *KnownBlockPartsMessage) er
|
||||
ps.startTime = newStartTime
|
||||
ps.height = msg.Height
|
||||
ps.blockPartsBitArray = msg.BlockPartsBitArray
|
||||
// Call callback if height+round matches.
|
||||
peerRound := calcRound(ps.startTime)
|
||||
if ps.cbFunc != nil && ps.cbHeight == ps.height && ps.cbRound == peerRound {
|
||||
go ps.cbFunc()
|
||||
ps.cbFunc = nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -784,6 +894,44 @@ func (ps *PeerState) ApplyVoteRankMessage(msg *VoteRankMessage) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sets a single round callback, to be called when the height+round comes around.
|
||||
// If the height+round is current, calls "go f()" immediately.
|
||||
// Otherwise, does nothing.
|
||||
func (ps *PeerState) SetRoundCallback(height uint32, round uint16, f func()) {
|
||||
ps.mtx.Lock()
|
||||
defer ps.mtx.Unlock()
|
||||
if ps.height < height {
|
||||
ps.cbHeight = height
|
||||
ps.cbRound = round
|
||||
ps.cbFunc = f
|
||||
// Wait until the height of the peerState changes.
|
||||
// We'll call cbFunc then.
|
||||
return
|
||||
} else if ps.height == height {
|
||||
peerRound := calcRound(ps.startTime)
|
||||
if peerRound < round {
|
||||
// Set a timer to call the cbFunc when the time comes.
|
||||
go func() {
|
||||
roundStart := calcRoundStartTime(round, ps.startTime)
|
||||
time.Sleep(roundStart.Sub(time.Now()))
|
||||
// If peer height is still good
|
||||
ps.mtx.Lock()
|
||||
peerHeight := ps.height
|
||||
ps.mtx.Unlock()
|
||||
if peerHeight == height {
|
||||
f()
|
||||
}
|
||||
}()
|
||||
} else if peerRound == round {
|
||||
go f()
|
||||
} else {
|
||||
return
|
||||
}
|
||||
} else {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
// Messages
|
||||
|
||||
|
41
consensus/document.go
Normal file
41
consensus/document.go
Normal file
@ -0,0 +1,41 @@
|
||||
package consensus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
. "github.com/tendermint/tendermint/config"
|
||||
)
|
||||
|
||||
func GenVoteDocument(voteType byte, height uint32, round uint16, proposalHash []byte) string {
|
||||
stepName := ""
|
||||
switch voteType {
|
||||
case VoteTypeBare:
|
||||
stepName = "bare"
|
||||
case VoteTypePrecommit:
|
||||
stepName = "precommit"
|
||||
case VoteTypeCommit:
|
||||
stepName = "commit"
|
||||
default:
|
||||
panic("Unknown vote type")
|
||||
}
|
||||
return fmt.Sprintf(
|
||||
`-----BEGIN TENDERMINT DOCUMENT-----
|
||||
URI: %v://consensus/%v/%v/%v
|
||||
ProposalHash: %X
|
||||
-----END TENDERMINT DOCUMENHT-----`,
|
||||
Config.Network, height, round, stepName,
|
||||
proposalHash,
|
||||
)
|
||||
}
|
||||
|
||||
func GenBlockPartDocument(height uint32, round uint16, index uint16, total uint16, blockPartHash []byte) string {
|
||||
return fmt.Sprintf(
|
||||
`-----BEGIN TENDERMINT DOCUMENT-----
|
||||
URI: %v://blockpart/%v/%v/%v
|
||||
Total: %v
|
||||
BlockPartHash: %X
|
||||
-----END TENDERMINT DOCUMENHT-----`,
|
||||
Config.Network, height, round, index,
|
||||
total,
|
||||
blockPartHash,
|
||||
)
|
||||
}
|
@ -1,20 +1,40 @@
|
||||
package consensus
|
||||
|
||||
import (
|
||||
. "github.com/tendermint/tendermint/blocks"
|
||||
db_ "github.com/tendermint/tendermint/db"
|
||||
. "github.com/tendermint/tendermint/state"
|
||||
)
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
// TODO: Ensure that double signing never happens via an external persistent check.
|
||||
type PrivValidator struct {
|
||||
PrivAccount
|
||||
db *db_.LevelDB
|
||||
}
|
||||
|
||||
// Returns new signed blockParts.
|
||||
// If signatures already exist in proposal BlockParts,
|
||||
// e.g. a locked proposal from some prior round,
|
||||
// those signatures are overwritten.
|
||||
// Double signing (signing multiple proposals for the same height&round) results in an error.
|
||||
func (pv *PrivValidator) SignProposal(round uint16, proposal *BlockPartSet) (err error) {
|
||||
//TODO: prevent double signing.
|
||||
blockParts := proposal.BlockParts()
|
||||
for i, part := range blockParts {
|
||||
partHash := part.Hash()
|
||||
doc := GenBlockPartDocument(
|
||||
proposal.Height(), round, uint16(i), uint16(len(blockParts)), partHash)
|
||||
part.Signature = pv.Sign([]byte(doc))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Modifies the vote object in memory.
|
||||
// Double signing results in an error.
|
||||
func (pv *PrivValidator) SignVote(vote *Vote) error {
|
||||
//TODO: prevent double signing.
|
||||
doc := GenVoteDocument(vote.Type, vote.Height, vote.Round, vote.Hash)
|
||||
vote.Signature = pv.Sign([]byte(doc))
|
||||
return nil
|
||||
}
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
. "github.com/tendermint/tendermint/blocks"
|
||||
. "github.com/tendermint/tendermint/common"
|
||||
. "github.com/tendermint/tendermint/state"
|
||||
)
|
||||
@ -131,7 +132,7 @@ type RoundState struct {
|
||||
Expires time.Time // Time after which this round is expired.
|
||||
Proposer *Validator // The proposer to propose a block for this round.
|
||||
Validators *ValidatorSet // All validators with modified accumPower for this round.
|
||||
BlockPartSet *BlockPartSet // All block parts received for this round.
|
||||
Proposal *BlockPartSet // All block parts received for this round.
|
||||
RoundBareVotes *VoteSet // All votes received for this round.
|
||||
RoundPrecommits *VoteSet // All precommits received for this round.
|
||||
Commits *VoteSet // A shared object for all commit votes of this height.
|
||||
@ -144,7 +145,7 @@ func NewRoundState(height uint32, round uint16, startTime time.Time,
|
||||
validators *ValidatorSet, commits *VoteSet) *RoundState {
|
||||
|
||||
proposer := validators.GetProposer()
|
||||
blockPartSet := NewBlockPartSet(height, round, &(proposer.Account))
|
||||
blockPartSet := NewBlockPartSet(height, nil)
|
||||
roundBareVotes := NewVoteSet(height, round, VoteTypeBare, validators)
|
||||
roundPrecommits := NewVoteSet(height, round, VoteTypePrecommit, validators)
|
||||
|
||||
@ -155,7 +156,7 @@ func NewRoundState(height uint32, round uint16, startTime time.Time,
|
||||
Expires: calcRoundStartTime(round+1, startTime),
|
||||
Proposer: proposer,
|
||||
Validators: validators,
|
||||
BlockPartSet: blockPartSet,
|
||||
Proposal: blockPartSet,
|
||||
RoundBareVotes: roundBareVotes,
|
||||
RoundPrecommits: roundPrecommits,
|
||||
Commits: commits,
|
||||
|
@ -3,13 +3,11 @@ package consensus
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
. "github.com/tendermint/tendermint/binary"
|
||||
. "github.com/tendermint/tendermint/blocks"
|
||||
"github.com/tendermint/tendermint/config"
|
||||
. "github.com/tendermint/tendermint/state"
|
||||
)
|
||||
|
||||
@ -30,7 +28,7 @@ var (
|
||||
// Represents a bare, precommit, or commit vote for proposals.
|
||||
type Vote struct {
|
||||
Height uint32
|
||||
Round uint16
|
||||
Round uint16 // zero if commit vote.
|
||||
Type byte
|
||||
Hash []byte // empty if vote is nil.
|
||||
Signature
|
||||
@ -55,45 +53,8 @@ func (v *Vote) WriteTo(w io.Writer) (n int64, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// This is the byteslice that validators should sign to signify a vote
|
||||
// for the given proposal at given height & round.
|
||||
// If hash is nil, the vote is a nil vote.
|
||||
func (v *Vote) GetDocument() []byte {
|
||||
switch v.Type {
|
||||
case VoteTypeBare:
|
||||
if len(v.Hash) == 0 {
|
||||
doc := fmt.Sprintf("%v://consensus/%v/%v/b\nnil",
|
||||
config.Config.Network, v.Height, v.Round)
|
||||
return []byte(doc)
|
||||
} else {
|
||||
doc := fmt.Sprintf("%v://consensus/%v/%v/b\n%v",
|
||||
config.Config.Network, v.Height, v.Round,
|
||||
CalcBlockURI(v.Height, v.Hash))
|
||||
return []byte(doc)
|
||||
}
|
||||
case VoteTypePrecommit:
|
||||
if len(v.Hash) == 0 {
|
||||
doc := fmt.Sprintf("%v://consensus/%v/%v/p\nnil",
|
||||
config.Config.Network, v.Height, v.Round)
|
||||
return []byte(doc)
|
||||
} else {
|
||||
doc := fmt.Sprintf("%v://consensus/%v/%v/p\n%v",
|
||||
config.Config.Network, v.Height, v.Round,
|
||||
CalcBlockURI(v.Height, v.Hash))
|
||||
return []byte(doc)
|
||||
}
|
||||
case VoteTypeCommit:
|
||||
if len(v.Hash) == 0 {
|
||||
panic("Commit hash cannot be nil")
|
||||
} else {
|
||||
doc := fmt.Sprintf("%v://consensus/%v/c\n%v",
|
||||
config.Config.Network, v.Height, // omit round info
|
||||
CalcBlockURI(v.Height, v.Hash))
|
||||
return []byte(doc)
|
||||
}
|
||||
default:
|
||||
panic("Unknown vote type")
|
||||
}
|
||||
func (v *Vote) GetDocument() string {
|
||||
return GenVoteDocument(v.Type, v.Height, v.Round, v.Hash)
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
@ -114,6 +75,9 @@ type VoteSet struct {
|
||||
|
||||
// Constructs a new VoteSet struct used to accumulate votes for each round.
|
||||
func NewVoteSet(height uint32, round uint16, type_ byte, validators *ValidatorSet) *VoteSet {
|
||||
if type_ == VoteTypeCommit && round != 0 {
|
||||
panic("Expected round 0 for commit vote set")
|
||||
}
|
||||
totalVotingPower := uint64(0)
|
||||
for _, val := range validators.Map() {
|
||||
totalVotingPower += val.VotingPower
|
||||
@ -137,7 +101,9 @@ func (vs *VoteSet) AddVote(vote *Vote) (bool, error) {
|
||||
defer vs.mtx.Unlock()
|
||||
|
||||
// Make sure the phase matches.
|
||||
if vote.Height != vs.height || vote.Round != vs.round || vote.Type != vs.type_ {
|
||||
if vote.Height != vs.height ||
|
||||
(vote.Type != VoteTypeCommit && vote.Round != vs.round) ||
|
||||
vote.Type != vs.type_ {
|
||||
return false, ErrVoteUnexpectedPhase
|
||||
}
|
||||
|
||||
@ -147,7 +113,7 @@ func (vs *VoteSet) AddVote(vote *Vote) (bool, error) {
|
||||
return false, ErrVoteInvalidAccount
|
||||
}
|
||||
// Check signature.
|
||||
if !val.Verify(vote.GetDocument(), vote.Signature.Bytes) {
|
||||
if !val.Verify([]byte(vote.GetDocument()), vote.Signature) {
|
||||
// Bad signature.
|
||||
return false, ErrVoteInvalidSignature
|
||||
}
|
||||
|
@ -8,12 +8,31 @@ import (
|
||||
)
|
||||
|
||||
func HashFromByteSlices(items [][]byte) []byte {
|
||||
panic("Implement me")
|
||||
return nil
|
||||
switch len(items) {
|
||||
case 0:
|
||||
panic("Cannot compute hash of empty slice")
|
||||
case 1:
|
||||
return items[0]
|
||||
default:
|
||||
var n int64
|
||||
var err error
|
||||
var hasher = sha256.New()
|
||||
hash := HashFromByteSlices(items[0 : len(items)/2])
|
||||
WriteByteSlice(hasher, hash, &n, &err)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
hash = HashFromByteSlices(items[len(items)/2:])
|
||||
WriteByteSlice(hasher, hash, &n, &err)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return hasher.Sum(nil)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Compute a deterministic merkle hash from a list of byteslices.
|
||||
Compute a deterministic merkle hash from a list of Binary objects.
|
||||
*/
|
||||
func HashFromBinarySlice(items []Binary) []byte {
|
||||
switch len(items) {
|
||||
|
@ -19,7 +19,10 @@ func ReadAccount(r io.Reader, n *int64, err *error) *Account {
|
||||
}
|
||||
}
|
||||
|
||||
func (self *Account) Verify(msg []byte, sig []byte) bool {
|
||||
func (self *Account) Verify(msg []byte, sig Signature) bool {
|
||||
if sig.SignerId != self.Id {
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@ -33,21 +36,3 @@ type PrivAccount struct {
|
||||
func (self *PrivAccount) Sign(msg []byte) Signature {
|
||||
return Signature{}
|
||||
}
|
||||
|
||||
/*
|
||||
// Signs the URI, which includes all data and metadata.
|
||||
// XXX implement or change
|
||||
func (bp *BlockPart) Sign(acc *PrivAccount) {
|
||||
// TODO: populate Signature
|
||||
}
|
||||
|
||||
// XXX maybe change.
|
||||
func (bp *BlockPart) ValidateWithSigner(signer *Account) error {
|
||||
// TODO: Sanity check height, index, total, bytes, etc.
|
||||
if !signer.Verify([]byte(bp.URI()), bp.Signature.Bytes) {
|
||||
return ErrInvalidBlockPartSignature
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
*/
|
||||
|
@ -1,19 +0,0 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
. "github.com/tendermint/tendermint/blocks"
|
||||
)
|
||||
|
||||
// XXX ugh, bad name.
|
||||
type StateStore struct {
|
||||
}
|
||||
|
||||
func (ss *StateStore) StageBlock(block *Block) error {
|
||||
// XXX implement staging.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ss *StateStore) CommitBlock(block *Block) error {
|
||||
// XXX implement staging.
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user