Added NodeInfo (still need to add to AddrBook); Proper Handshake

This commit is contained in:
Jae Kwon
2015-04-20 15:29:01 -07:00
parent 4948fe7725
commit dc1fddd1d8
10 changed files with 121 additions and 44 deletions

View File

@ -1,8 +1,10 @@
package node package node
import ( import (
"net"
"net/http" "net/http"
"os" "os"
"strconv"
bc "github.com/tendermint/tendermint/blockchain" bc "github.com/tendermint/tendermint/blockchain"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
@ -15,6 +17,7 @@ import (
"github.com/tendermint/tendermint/rpc" "github.com/tendermint/tendermint/rpc"
"github.com/tendermint/tendermint/rpc/core" "github.com/tendermint/tendermint/rpc/core"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
) )
type Node struct { type Node struct {
@ -79,7 +82,6 @@ func NewNode() *Node {
} }
sw := p2p.NewSwitch() sw := p2p.NewSwitch()
sw.SetNetwork(config.App().GetString("Network"))
sw.AddReactor("PEX", pexReactor) sw.AddReactor("PEX", pexReactor)
sw.AddReactor("MEMPOOL", mempoolReactor) sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor) sw.AddReactor("BLOCKCHAIN", bcReactor)
@ -103,9 +105,12 @@ func NewNode() *Node {
} }
} }
// Call Start() after adding the listeners.
func (n *Node) Start() { func (n *Node) Start() {
log.Info("Starting Node") log.Info("Starting Node")
n.book.Start() n.book.Start()
nodeInfo := makeNodeInfo(n.sw)
n.sw.SetNodeInfo(nodeInfo)
n.sw.Start() n.sw.Start()
if config.App().GetBool("FastSync") { if config.App().GetBool("FastSync") {
// TODO: When FastSync is done, start CONSENSUS. // TODO: When FastSync is done, start CONSENSUS.
@ -128,6 +133,8 @@ func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) {
} }
// Add a Listener to accept inbound peer connections. // Add a Listener to accept inbound peer connections.
// Add listeners before starting the Node.
// The first listener is the primary listener (in NodeInfo)
func (n *Node) AddListener(l p2p.Listener) { func (n *Node) AddListener(l p2p.Listener) {
log.Info(Fmt("Added %v", l)) log.Info(Fmt("Added %v", l))
n.sw.AddListener(l) n.sw.AddListener(l)
@ -176,6 +183,32 @@ func (n *Node) EventSwitch() *events.EventSwitch {
return n.evsw return n.evsw
} }
func makeNodeInfo(sw *p2p.Switch) *types.NodeInfo {
nodeInfo := &types.NodeInfo{
Moniker: config.App().GetString("Moniker"),
Network: config.App().GetString("Network"),
}
if !sw.IsListening() {
return nodeInfo
}
p2pListener := sw.Listeners()[0]
p2pHost := p2pListener.ExternalAddress().IP.String()
p2pPort := p2pListener.ExternalAddress().Port
rpcListenAddr := config.App().GetString("RPC.HTTP.ListenAddr")
_, rpcPortStr, _ := net.SplitHostPort(rpcListenAddr)
rpcPort, err := strconv.Atoi(rpcPortStr)
if err != nil {
panic(Fmt("Expected numeric RPC.HTTP.ListenAddr port but got %v", rpcPortStr))
}
// We assume that the rpcListener has the same ExternalAddress.
// This is probably true because both P2P and RPC listeners use UPnP.
nodeInfo.Host = p2pHost
nodeInfo.P2PPort = p2pPort
nodeInfo.RPCPort = uint16(rpcPort)
return nodeInfo
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
func RunNode() { func RunNode() {

View File

@ -128,6 +128,10 @@ func (l *DefaultListener) ExternalAddress() *NetAddress {
return l.extAddr return l.extAddr
} }
func (l *DefaultListener) NetListener() net.Listener {
return l.listener
}
func (l *DefaultListener) Stop() { func (l *DefaultListener) Stop() {
if atomic.CompareAndSwapUint32(&l.stopped, 0, 1) { if atomic.CompareAndSwapUint32(&l.stopped, 0, 1) {
l.listener.Close() l.listener.Close()

View File

@ -4,10 +4,12 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"sync"
"sync/atomic" "sync/atomic"
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/types"
) )
type Peer struct { type Peer struct {
@ -15,11 +17,39 @@ type Peer struct {
mconn *MConnection mconn *MConnection
running uint32 running uint32
*types.NodeInfo
Key string Key string
Data *CMap // User data. Data *CMap // User data.
} }
func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer { func peerHandshake(conn net.Conn, ourNodeInfo *types.NodeInfo) (*types.NodeInfo, error) {
var peerNodeInfo = new(types.NodeInfo)
var wg sync.WaitGroup
var err1 error
var err2 error
wg.Add(2)
go func() {
var n int64
binary.WriteBinary(ourNodeInfo, conn, &n, &err1)
wg.Done()
}()
go func() {
var n int64
binary.ReadBinary(peerNodeInfo, conn, &n, &err2)
log.Info("Peer handshake", "peerNodeInfo", peerNodeInfo)
wg.Done()
}()
wg.Wait()
if err1 != nil {
return nil, err1
}
if err2 != nil {
return nil, err2
}
return peerNodeInfo, nil
}
func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{})) *Peer {
var p *Peer var p *Peer
onReceive := func(chId byte, msgBytes []byte) { onReceive := func(chId byte, msgBytes []byte) {
reactor := reactorsByCh[chId] reactor := reactorsByCh[chId]
@ -37,6 +67,7 @@ func newPeer(conn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDesc
outbound: outbound, outbound: outbound,
mconn: mconn, mconn: mconn,
running: 0, running: 0,
NodeInfo: peerNodeInfo,
Key: mconn.RemoteAddress.String(), Key: mconn.RemoteAddress.String(),
Data: NewCMap(), Data: NewCMap(),
} }

View File

@ -101,12 +101,6 @@ func (pexR *PEXReactor) Receive(chId byte, src *Peer, msgBytes []byte) {
log.Info("Received message", "msg", msg) log.Info("Received message", "msg", msg)
switch msg.(type) { switch msg.(type) {
case *pexHandshakeMessage:
network := msg.(*pexHandshakeMessage).Network
if network != pexR.sw.network {
err := fmt.Sprintf("Peer is on a different chain/network. Got %s, expected %s", network, pexR.sw.network)
pexR.sw.StopPeerForError(src, err)
}
case *pexRequestMessage: case *pexRequestMessage:
// src requested some peers. // src requested some peers.
// TODO: prevent abuse. // TODO: prevent abuse.
@ -219,16 +213,14 @@ func (pexR *PEXReactor) SetFireable(evsw events.Fireable) {
// Messages // Messages
const ( const (
msgTypeRequest = byte(0x01) msgTypeRequest = byte(0x01)
msgTypeAddrs = byte(0x02) msgTypeAddrs = byte(0x02)
msgTypeHandshake = byte(0x03)
) )
type PexMessage interface{} type PexMessage interface{}
var _ = binary.RegisterInterface( var _ = binary.RegisterInterface(
struct{ PexMessage }{}, struct{ PexMessage }{},
binary.ConcreteType{&pexHandshakeMessage{}, msgTypeHandshake},
binary.ConcreteType{&pexRequestMessage{}, msgTypeRequest}, binary.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
binary.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs}, binary.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
) )
@ -241,17 +233,6 @@ func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
return return
} }
/*
A pexHandshakeMessage contains the network identifier.
*/
type pexHandshakeMessage struct {
Network string
}
func (m *pexHandshakeMessage) String() string {
return "[pexHandshake]"
}
/* /*
A pexRequestMessage requests additional peer addresses. A pexRequestMessage requests additional peer addresses.
*/ */

View File

@ -8,6 +8,7 @@ import (
"time" "time"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/types"
) )
type Reactor interface { type Reactor interface {
@ -39,7 +40,6 @@ or more `Channels`. So while sending outgoing messages is typically performed o
incoming messages are received on the reactor. incoming messages are received on the reactor.
*/ */
type Switch struct { type Switch struct {
network string
listeners []Listener listeners []Listener
reactors map[string]Reactor reactors map[string]Reactor
chDescs []*ChannelDescriptor chDescs []*ChannelDescriptor
@ -47,6 +47,7 @@ type Switch struct {
peers *PeerSet peers *PeerSet
dialing *CMap dialing *CMap
running uint32 running uint32
nodeInfo *types.NodeInfo // our node info
} }
var ( var (
@ -58,25 +59,18 @@ const (
) )
func NewSwitch() *Switch { func NewSwitch() *Switch {
sw := &Switch{ sw := &Switch{
network: "",
reactors: make(map[string]Reactor), reactors: make(map[string]Reactor),
chDescs: make([]*ChannelDescriptor, 0), chDescs: make([]*ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor), reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(), peers: NewPeerSet(),
dialing: NewCMap(), dialing: NewCMap(),
running: 0, running: 0,
nodeInfo: nil,
} }
return sw return sw
} }
// Not goroutine safe.
func (sw *Switch) SetNetwork(network string) {
sw.network = network
}
// Not goroutine safe. // Not goroutine safe.
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
// Validate the reactor. // Validate the reactor.
@ -109,6 +103,7 @@ func (sw *Switch) AddListener(l Listener) {
sw.listeners = append(sw.listeners, l) sw.listeners = append(sw.listeners, l)
} }
// Not goroutine safe.
func (sw *Switch) Listeners() []Listener { func (sw *Switch) Listeners() []Listener {
return sw.listeners return sw.listeners
} }
@ -118,6 +113,11 @@ func (sw *Switch) IsListening() bool {
return len(sw.listeners) > 0 return len(sw.listeners) > 0
} }
// Not goroutine safe.
func (sw *Switch) SetNodeInfo(nodeInfo *types.NodeInfo) {
sw.nodeInfo = nodeInfo
}
func (sw *Switch) Start() { func (sw *Switch) Start() {
if atomic.CompareAndSwapUint32(&sw.running, 0, 1) { if atomic.CompareAndSwapUint32(&sw.running, 0, 1) {
// Start reactors // Start reactors
@ -154,8 +154,17 @@ func (sw *Switch) Stop() {
} }
} }
// NOTE: This performs a blocking handshake before the peer is added.
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) { func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
peer := newPeer(conn, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) // First, perform handshake
peerNodeInfo, err := peerHandshake(conn, sw.nodeInfo)
if err != nil {
return nil, err
}
if peerNodeInfo.Network != sw.nodeInfo.Network {
return nil, fmt.Errorf("Peer is on different network %v", peerNodeInfo.Network)
}
peer := newPeer(conn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
// Add the peer to .peers // Add the peer to .peers
if sw.peers.Add(peer) { if sw.peers.Add(peer) {
@ -177,10 +186,6 @@ func (sw *Switch) startInitPeer(peer *Peer) {
// Notify reactors // Notify reactors
sw.doAddPeer(peer) sw.doAddPeer(peer)
// Send handshake
msg := &pexHandshakeMessage{Network: sw.network}
peer.Send(PexChannel, msg)
} }
func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {

View File

@ -8,6 +8,7 @@ import (
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/types"
) )
type PeerMessage struct { type PeerMessage struct {
@ -73,7 +74,17 @@ func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *S
// Create two switches that will be interconnected. // Create two switches that will be interconnected.
s1 := initSwitch(NewSwitch()) s1 := initSwitch(NewSwitch())
s1.SetNodeInfo(&types.NodeInfo{
Moniker: "switch1",
Network: "testing",
})
s2 := initSwitch(NewSwitch()) s2 := initSwitch(NewSwitch())
s2.SetNodeInfo(&types.NodeInfo{
Moniker: "switch2",
Network: "testing",
})
// Start switches
s1.Start() s1.Start()
s2.Start() s2.Start()
@ -93,7 +104,7 @@ func makeSwitchPair(t testing.TB, initSwitch func(*Switch) *Switch) (*Switch, *S
t.Fatalf("Could not get inbound connection from listener") t.Fatalf("Could not get inbound connection from listener")
} }
s1.AddPeerWithConnection(connIn, false) go s1.AddPeerWithConnection(connIn, false) // AddPeer is blocking, requires handshake.
s2.AddPeerWithConnection(connOut, true) s2.AddPeerWithConnection(connOut, true)
// Wait for things to happen, peers to get added... // Wait for things to happen, peers to get added...
@ -142,10 +153,10 @@ func TestSwitches(t *testing.T) {
// Check message on ch0 // Check message on ch0
ch0Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x00)] ch0Msgs := s2.Reactor("foo").(*TestReactor).msgsReceived[byte(0x00)]
if len(ch0Msgs) != 2 { if len(ch0Msgs) != 1 {
t.Errorf("Expected to have received 1 message in ch0") t.Errorf("Expected to have received 1 message in ch0")
} }
if !bytes.Equal(ch0Msgs[1].Bytes, binary.BinaryBytes(ch0Msg)) { if !bytes.Equal(ch0Msgs[0].Bytes, binary.BinaryBytes(ch0Msg)) {
t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", binary.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes) t.Errorf("Unexpected message bytes. Wanted: %X, Got: %X", binary.BinaryBytes(ch0Msg), ch0Msgs[0].Bytes)
} }

View File

@ -42,7 +42,7 @@ func NetInfo() (*ctypes.ResponseNetInfo, error) {
peers := []ctypes.Peer{} peers := []ctypes.Peer{}
for _, peer := range p2pSwitch.Peers().List() { for _, peer := range p2pSwitch.Peers().List() {
peers = append(peers, ctypes.Peer{ peers = append(peers, ctypes.Peer{
Address: peer.Connection().RemoteAddress.String(), NodeInfo: *peer.NodeInfo,
IsOutbound: peer.IsOutbound(), IsOutbound: peer.IsOutbound(),
}) })
} }

View File

@ -78,7 +78,7 @@ type ResponseNetInfo struct {
} }
type Peer struct { type Peer struct {
Address string types.NodeInfo
IsOutbound bool IsOutbound bool
} }

View File

@ -13,13 +13,16 @@ import (
"github.com/tendermint/tendermint/alert" "github.com/tendermint/tendermint/alert"
"github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/binary"
. "github.com/tendermint/tendermint/common" . "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/p2p"
) )
func StartHTTPServer(listenAddr string, handler http.Handler) { func StartHTTPServer(listenAddr string, handler http.Handler) {
log.Info(Fmt("Starting RPC HTTP server on %s", listenAddr)) log.Info(Fmt("Starting RPC HTTP server on %s", listenAddr))
go func() { go func() {
res := http.ListenAndServe( listener := p2p.NewDefaultListener("tcp", listenAddr, false)
listenAddr, netListener := listener.(*p2p.DefaultListener).NetListener()
res := http.Serve(
netListener,
RecoverAndLogHandler(handler), RecoverAndLogHandler(handler),
) )
log.Crit("RPC HTTPServer stopped", "result", res) log.Crit("RPC HTTPServer stopped", "result", res)

9
types/node.go Normal file
View File

@ -0,0 +1,9 @@
package types
type NodeInfo struct {
Moniker string
Network string
Host string
P2PPort uint16
RPCPort uint16
}