peer interface

This commit is contained in:
Ethan Buchman
2017-09-12 20:49:22 -04:00
parent 54c63726b0
commit aea8629272
15 changed files with 195 additions and 148 deletions

View File

@@ -120,14 +120,14 @@ func (conR *ConsensusReactor) GetChannels() []*p2p.ChannelDescriptor {
}
// AddPeer implements Reactor
func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
func (conR *ConsensusReactor) AddPeer(peer p2p.Peer) {
if !conR.IsRunning() {
return
}
// Create peerState for peer
peerState := NewPeerState(peer)
peer.Data.Set(types.PeerStateKey, peerState)
peerState := NewPeerState(peer).SetLogger(conR.Logger)
peer.Set(types.PeerStateKey, peerState)
// Begin routines for this peer.
go conR.gossipDataRoutine(peer, peerState)
@@ -142,12 +142,12 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) {
}
// RemovePeer implements Reactor
func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
func (conR *ConsensusReactor) RemovePeer(peer p2p.Peer, reason interface{}) {
if !conR.IsRunning() {
return
}
// TODO
//peer.Data.Get(PeerStateKey).(*PeerState).Disconnect()
//peer.Get(PeerStateKey).(*PeerState).Disconnect()
}
// Receive implements Reactor
@@ -156,7 +156,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// Peer state updates can happen in parallel, but processing of
// proposals, block parts, and votes are ordered by the receiveRoutine
// NOTE: blocks on consensus state for proposals, block parts, and votes
func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
func (conR *ConsensusReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
if !conR.IsRunning() {
conR.Logger.Debug("Receive", "src", src, "chId", chID, "bytes", msgBytes)
return
@@ -171,7 +171,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
conR.Logger.Debug("Receive", "src", src, "chId", chID, "msg", msg)
// Get peer states
ps := src.Data.Get(types.PeerStateKey).(*PeerState)
ps := src.Get(types.PeerStateKey).(*PeerState)
switch chID {
case StateChannel:
@@ -191,7 +191,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
return
}
// Peer claims to have a maj23 for some BlockID at H,R,S,
votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID)
votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key(), msg.BlockID)
// Respond with a VoteSetBitsMessage showing which votes we have.
// (and consequently shows which we don't have)
var ourVotes *cmn.BitArray
@@ -228,12 +228,12 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
switch msg := msg.(type) {
case *ProposalMessage:
ps.SetHasProposal(msg.Proposal)
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key()}
case *ProposalPOLMessage:
ps.ApplyProposalPOLMessage(msg)
case *BlockPartMessage:
ps.SetHasProposalBlockPart(msg.Height, msg.Round, msg.Part.Index)
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key}
conR.conS.peerMsgQueue <- msgInfo{msg, src.Key()}
default:
conR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
}
@@ -253,7 +253,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte)
ps.EnsureVoteBitArrays(height-1, lastCommitSize)
ps.SetHasVote(msg.Vote)
cs.peerMsgQueue <- msgInfo{msg, src.Key}
cs.peerMsgQueue <- msgInfo{msg, src.Key()}
default:
// don't punish (leave room for soft upgrades)
@@ -367,7 +367,7 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote) {
/*
// TODO: Make this broadcast more selective.
for _, peer := range conR.Switch.Peers().List() {
ps := peer.Data.Get(PeerStateKey).(*PeerState)
ps := peer.Get(PeerStateKey).(*PeerState)
prs := ps.GetRoundState()
if prs.Height == vote.Height {
// TODO: Also filter on round?
@@ -399,7 +399,7 @@ func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *
return
}
func (conR *ConsensusReactor) sendNewRoundStepMessages(peer *p2p.Peer) {
func (conR *ConsensusReactor) sendNewRoundStepMessages(peer p2p.Peer) {
rs := conR.conS.GetRoundState()
nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil {
@@ -410,7 +410,7 @@ func (conR *ConsensusReactor) sendNewRoundStepMessages(peer *p2p.Peer) {
}
}
func (conR *ConsensusReactor) gossipDataRoutine(peer *p2p.Peer, ps *PeerState) {
func (conR *ConsensusReactor) gossipDataRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
OUTER_LOOP:
@@ -492,7 +492,7 @@ OUTER_LOOP:
}
func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *RoundState,
prs *PeerRoundState, ps *PeerState, peer *p2p.Peer) {
prs *PeerRoundState, ps *PeerState, peer p2p.Peer) {
if index, ok := prs.ProposalBlockParts.Not().PickRandom(); ok {
// Ensure that the peer's PartSetHeader is correct
@@ -534,7 +534,7 @@ func (conR *ConsensusReactor) gossipDataForCatchup(logger log.Logger, rs *RoundS
}
}
func (conR *ConsensusReactor) gossipVotesRoutine(peer *p2p.Peer, ps *PeerState) {
func (conR *ConsensusReactor) gossipVotesRoutine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
// Simple hack to throttle logs upon sleep.
@@ -644,7 +644,7 @@ func (conR *ConsensusReactor) gossipVotesForHeight(logger log.Logger, rs *RoundS
// NOTE: `queryMaj23Routine` has a simple crude design since it only comes
// into play for liveness when there's a signature DDoS attack happening.
func (conR *ConsensusReactor) queryMaj23Routine(peer *p2p.Peer, ps *PeerState) {
func (conR *ConsensusReactor) queryMaj23Routine(peer p2p.Peer, ps *PeerState) {
logger := conR.Logger.With("peer", peer)
OUTER_LOOP:
@@ -743,7 +743,7 @@ func (conR *ConsensusReactor) StringIndented(indent string) string {
s := "ConsensusReactor{\n"
s += indent + " " + conR.conS.StringIndented(indent+" ") + "\n"
for _, peer := range conR.Switch.Peers().List() {
ps := peer.Data.Get(types.PeerStateKey).(*PeerState)
ps := peer.Get(types.PeerStateKey).(*PeerState)
s += indent + " " + ps.StringIndented(indent+" ") + "\n"
}
s += indent + "}"
@@ -808,16 +808,18 @@ var (
// PeerState contains the known state of a peer, including its connection
// and threadsafe access to its PeerRoundState.
type PeerState struct {
Peer *p2p.Peer
Peer p2p.Peer
logger log.Logger
mtx sync.Mutex
PeerRoundState
}
// NewPeerState returns a new PeerState for the given Peer
func NewPeerState(peer *p2p.Peer) *PeerState {
func NewPeerState(peer p2p.Peer) *PeerState {
return &PeerState{
Peer: peer,
Peer: peer,
logger: log.NewNopLogger(),
PeerRoundState: PeerRoundState{
Round: -1,
ProposalPOLRound: -1,
@@ -827,6 +829,11 @@ func NewPeerState(peer *p2p.Peer) *PeerState {
}
}
func (ps *PeerState) SetLogger(logger log.Logger) *PeerState {
ps.logger = logger
return ps
}
// GetRoundState returns an atomic snapshot of the PeerRoundState.
// There's no point in mutating it since it won't change PeerState.
func (ps *PeerState) GetRoundState() *PeerRoundState {
@@ -1025,7 +1032,7 @@ func (ps *PeerState) SetHasVote(vote *types.Vote) {
}
func (ps *PeerState) setHasVote(height int, round int, type_ byte, index int) {
logger := ps.Peer.Logger.With("peerRound", ps.Round, "height", height, "round", round)
logger := ps.logger.With("peerRound", ps.Round, "height", height, "round", round)
logger.Debug("setHasVote(LastCommit)", "lastCommit", ps.LastCommit, "index", index)
// NOTE: some may be nil BitArrays -> no side effects.
@@ -1163,7 +1170,7 @@ func (ps *PeerState) StringIndented(indent string) string {
%s Key %v
%s PRS %v
%s}`,
indent, ps.Peer.Key,
indent, ps.Peer.Key(),
indent, ps.PeerRoundState.StringIndented(indent+" "),
indent)
}