From dc1fddd1d88025c918d02c2fed097fa4874c8c2e Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 20 Apr 2015 15:29:01 -0700 Subject: [PATCH] Added NodeInfo (still need to add to AddrBook); Proper Handshake --- node/node.go | 35 ++++++++++++++++++++++++++++++++++- p2p/listener.go | 4 ++++ p2p/peer.go | 33 ++++++++++++++++++++++++++++++++- p2p/pex_reactor.go | 23 ++--------------------- p2p/switch.go | 33 +++++++++++++++++++-------------- p2p/switch_test.go | 17 ++++++++++++++--- rpc/core/net.go | 2 +- rpc/core/types/responses.go | 2 +- rpc/http_server.go | 7 +++++-- types/node.go | 9 +++++++++ 10 files changed, 121 insertions(+), 44 deletions(-) create mode 100644 types/node.go diff --git a/node/node.go b/node/node.go index df5bd5a2..cab2c397 100644 --- a/node/node.go +++ b/node/node.go @@ -1,8 +1,10 @@ package node import ( + "net" "net/http" "os" + "strconv" bc "github.com/tendermint/tendermint/blockchain" . "github.com/tendermint/tendermint/common" @@ -15,6 +17,7 @@ import ( "github.com/tendermint/tendermint/rpc" "github.com/tendermint/tendermint/rpc/core" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/types" ) type Node struct { @@ -79,7 +82,6 @@ func NewNode() *Node { } sw := p2p.NewSwitch() - sw.SetNetwork(config.App().GetString("Network")) sw.AddReactor("PEX", pexReactor) sw.AddReactor("MEMPOOL", mempoolReactor) sw.AddReactor("BLOCKCHAIN", bcReactor) @@ -103,9 +105,12 @@ func NewNode() *Node { } } +// Call Start() after adding the listeners. func (n *Node) Start() { log.Info("Starting Node") n.book.Start() + nodeInfo := makeNodeInfo(n.sw) + n.sw.SetNodeInfo(nodeInfo) n.sw.Start() if config.App().GetBool("FastSync") { // TODO: When FastSync is done, start CONSENSUS. @@ -128,6 +133,8 @@ func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) { } // Add a Listener to accept inbound peer connections. +// Add listeners before starting the Node. +// The first listener is the primary listener (in NodeInfo) func (n *Node) AddListener(l p2p.Listener) { log.Info(Fmt("Added %v", l)) n.sw.AddListener(l) @@ -176,6 +183,32 @@ func (n *Node) EventSwitch() *events.EventSwitch { return n.evsw } +func makeNodeInfo(sw *p2p.Switch) *types.NodeInfo { + nodeInfo := &types.NodeInfo{ + Moniker: config.App().GetString("Moniker"), + Network: config.App().GetString("Network"), + } + if !sw.IsListening() { + return nodeInfo + } + p2pListener := sw.Listeners()[0] + p2pHost := p2pListener.ExternalAddress().IP.String() + p2pPort := p2pListener.ExternalAddress().Port + rpcListenAddr := config.App().GetString("RPC.HTTP.ListenAddr") + _, rpcPortStr, _ := net.SplitHostPort(rpcListenAddr) + rpcPort, err := strconv.Atoi(rpcPortStr) + if err != nil { + panic(Fmt("Expected numeric RPC.HTTP.ListenAddr port but got %v", rpcPortStr)) + } + + // We assume that the rpcListener has the same ExternalAddress. + // This is probably true because both P2P and RPC listeners use UPnP. + nodeInfo.Host = p2pHost + nodeInfo.P2PPort = p2pPort + nodeInfo.RPCPort = uint16(rpcPort) + return nodeInfo +} + //------------------------------------------------------------------------------ func RunNode() { diff --git a/p2p/listener.go b/p2p/listener.go index cc27c0c7..48557b97 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -128,6 +128,10 @@ func (l *DefaultListener) ExternalAddress() *NetAddress { return l.extAddr } +func (l *DefaultListener) NetListener() net.Listener { + return l.listener +} + func (l *DefaultListener) Stop() { if atomic.CompareAndSwapUint32(&l.stopped, 0, 1) { l.listener.Close() diff --git a/p2p/peer.go b/p2p/peer.go index 173297eb..549200e2 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -4,10 +4,12 @@ import ( "fmt" "io" "net" + "sync" "sync/atomic" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/types" ) type Peer struct { @@ -15,11 +17,39 @@ type Peer struct { mconn *MConnection running uint32 + *types.NodeInfo Key string Data *CMap // User data. } -func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { +func peerHandshake(conn net.Conn, ourNodeInfo *types.NodeInfo) (*types.NodeInfo, error) { + var peerNodeInfo = new(types.NodeInfo) + var wg sync.WaitGroup + var err1 error + var err2 error + wg.Add(2) + go func() { + var n int64 + binary.WriteBinary(ourNodeInfo, conn, &n, &err1) + wg.Done() + }() + go func() { + var n int64 + binary.ReadBinary(peerNodeInfo, conn, &n, &err2) + log.Info("Peer handshake", "peerNodeInfo", peerNodeInfo) + wg.Done() + }() + wg.Wait() + if err1 != nil { + return nil, err1 + } + if err2 != nil { + return nil, err2 + } + return peerNodeInfo, nil +} + +func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { var p *Peer onReceive := func(chId byte, msgBytes []byte) { reactor := reactorsByCh[chId] @@ -37,6 +67,7 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc outbound: outbound, mconn: mconn, running: 0, + NodeInfo: peerNodeInfo, Key: mconn.RemoteAddress.String(), Data: NewCMap(), } diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go index 8a3ba644..0f9766d6 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex_reactor.go @@ -101,12 +101,6 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) { log.Info("Received message", "msg", msg) switch msg.(type) { - case *pexHandshakeMessage: - network := msg.(*pexHandshakeMessage).Network - if network != pexR.sw.network { - err := fmt.Sprintf("Peer is on a different chain/network. Got %s, expected %s", network, pexR.sw.network) - pexR.sw.StopPeerForError(src, err) - } case *pexRequestMessage: // src requested some peers. // TODO: prevent abuse. @@ -219,16 +213,14 @@ func (pexR *PEXReactor) SetFireable(evsw events.Fireable) { // Messages const ( - msgTypeRequest = byte(0x01) - msgTypeAddrs = byte(0x02) - msgTypeHandshake = byte(0x03) + msgTypeRequest = byte(0x01) + msgTypeAddrs = byte(0x02) ) type PexMessage interface{} var _ = binary.RegisterInterface( struct{ PexMessage }{}, - binary.ConcreteType{&pexHandshakeMessage{}, msgTypeHandshake}, binary.ConcreteType{&pexRequestMessage{}, msgTypeRequest}, binary.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs}, ) @@ -241,17 +233,6 @@ func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) { return } -/* -A pexHandshakeMessage contains the network identifier. -*/ -type pexHandshakeMessage struct { - Network string -} - -func (m *pexHandshakeMessage) String() string { - return "[pexHandshake]" -} - /* A pexRequestMessage requests additional peer addresses. */ diff --git a/p2p/switch.go b/p2p/switch.go index a07c026f..029c278d 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -8,6 +8,7 @@ import ( "time" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/types" ) type Reactor interface { @@ -39,7 +40,6 @@ or more `Channels`. So while sending outgoing messages is typically performed o incoming messages are received on the reactor. */ type Switch struct { - network string listeners []Listener reactors map[string]Reactor chDescs []*ChannelDescriptor @@ -47,6 +47,7 @@ type Switch struct { peers *PeerSet dialing *CMap running uint32 + nodeInfo *types.NodeInfo // our node info } var ( @@ -58,25 +59,18 @@ const ( ) func NewSwitch() *Switch { - sw := &Switch{ - network: "", reactors: make(map[string]Reactor), chDescs: make([]*ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: NewCMap(), running: 0, + nodeInfo: nil, } - return sw } -// Not goroutine safe. -func (sw *Switch) SetNetwork(network string) { - sw.network = network -} - // Not goroutine safe. func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { // Validate the reactor. @@ -109,6 +103,7 @@ func (sw *Switch) AddListener(l Listener) { sw.listeners = append(sw.listeners, l) } +// Not goroutine safe. func (sw *Switch) Listeners() []Listener { return sw.listeners } @@ -118,6 +113,11 @@ func (sw *Switch) IsListening() bool { return len(sw.listeners) > 0 } +// Not goroutine safe. +func (sw *Switch) SetNodeInfo(nodeInfo *types.NodeInfo) { + sw.nodeInfo = nodeInfo +} + func (sw *Switch) Start() { if atomic.CompareAndSwapUint32(&sw.running, 0, 1) { // Start reactors @@ -154,8 +154,17 @@ func (sw *Switch) Stop() { } } +// NOTE: This performs a blocking handshake before the peer is added. func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { - peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) + // First, perform handshake + peerNodeInfo, err := peerHandshake(conn, sw.nodeInfo) + if err != nil { + return nil, err + } + if peerNodeInfo.Network != sw.nodeInfo.Network { + return nil, fmt.Errorf("Peer is on different network %v", peerNodeInfo.Network) + } + peer := newPeer(conn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) // Add the peer to .peers if sw.peers.Add(peer) { @@ -177,10 +186,6 @@ func (sw *Switch) startInitPeer(peer *Peer) { // Notify reactors sw.doAddPeer(peer) - - // Send handshake - msg := &pexHandshakeMessage{Network: sw.network} - peer.Send(PexChannel, msg) } func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { diff --git a/p2p/switch_test.go b/p2p/switch_test.go index d66c039a..fc3ed41d 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -8,6 +8,7 @@ import ( "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/types" ) type PeerMessage struct { @@ -73,7 +74,17 @@ func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *S // Create two switches that will be interconnected. s1 := initSwitch(NewSwitch()) + s1.SetNodeInfo(&types.NodeInfo{ + Moniker: "switch1", + Network: "testing", + }) s2 := initSwitch(NewSwitch()) + s2.SetNodeInfo(&types.NodeInfo{ + Moniker: "switch2", + Network: "testing", + }) + + // Start switches s1.Start() s2.Start() @@ -93,7 +104,7 @@ func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *S t.Fatalf("Could not get inbound connection from listener") } - s1.AddPeerWithConnection(connIn, false) + go s1.AddPeerWithConnection(connIn, false) // AddPeer is blocking, requires handshake. s2.AddPeerWithConnection(connOut, true) // Wait for things to happen, peers to get added... @@ -142,10 +153,10 @@ func TestSwitches(t *testing.T) { // Check message on ch0 ch0Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x00)] - if len(ch0Msgs) != 2 { + if len(ch0Msgs) != 1 { t.Errorf("Expected to have received 1 message in ch0") } - if !bytes.Equal(ch0Msgs[1].Bytes, binary.BinaryBytes(ch0Msg)) { + if !bytes.Equal(ch0Msgs[0].Bytes, binary.BinaryBytes(ch0Msg)) { t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", binary.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes) } diff --git a/rpc/core/net.go b/rpc/core/net.go index 3fcb37e1..7f852f34 100644 --- a/rpc/core/net.go +++ b/rpc/core/net.go @@ -42,7 +42,7 @@ func NetInfo() (*ctypes.ResponseNetInfo, error) { peers := []ctypes.Peer{} for _, peer := range p2pSwitch.Peers().List() { peers = append(peers, ctypes.Peer{ - Address: peer.Connection().RemoteAddress.String(), + NodeInfo: *peer.NodeInfo, IsOutbound: peer.IsOutbound(), }) } diff --git a/rpc/core/types/responses.go b/rpc/core/types/responses.go index e7c71e25..fa031e04 100644 --- a/rpc/core/types/responses.go +++ b/rpc/core/types/responses.go @@ -78,7 +78,7 @@ type ResponseNetInfo struct { } type Peer struct { - Address string + types.NodeInfo IsOutbound bool } diff --git a/rpc/http_server.go b/rpc/http_server.go index 5eaff856..939ee758 100644 --- a/rpc/http_server.go +++ b/rpc/http_server.go @@ -13,13 +13,16 @@ import ( "github.com/tendermint/tendermint/alert" "github.com/tendermint/tendermint/binary" . "github.com/tendermint/tendermint/common" + "github.com/tendermint/tendermint/p2p" ) func StartHTTPServer(listenAddr string, handler http.Handler) { log.Info(Fmt("Starting RPC HTTP server on %s", listenAddr)) go func() { - res := http.ListenAndServe( - listenAddr, + listener := p2p.NewDefaultListener("tcp", listenAddr, false) + netListener := listener.(*p2p.DefaultListener).NetListener() + res := http.Serve( + netListener, RecoverAndLogHandler(handler), ) log.Crit("RPC HTTPServer stopped", "result", res) diff --git a/types/node.go b/types/node.go new file mode 100644 index 00000000..7a3c41a7 --- /dev/null +++ b/types/node.go @@ -0,0 +1,9 @@ +package types + +type NodeInfo struct { + Moniker string + Network string + Host string + P2PPort uint16 + RPCPort uint16 +}