From be5d68ea4fd5ea249d4598f82fd4a35c2d84cace Mon Sep 17 00:00:00 2001 From: Alexander Simmerl Date: Tue, 18 Sep 2018 22:11:54 +0200 Subject: [PATCH] p2p: Implement PeerTransport This is the implementation for the design described in ADR 12[0]. It's the first step of a larger refactor of the p2p package as tracked in interface bundling all concerns of low-level connection handling and isolating the rest of peer lifecycle management from the specifics of the low-level internet protocols. Even if the swappable implementation will never be utilised, already the isolation of conn related code in one place will help with the reasoning about execution path and addressation of security sensitive issues surfaced through bounty programs and audits. We deliberately decided to not have Peer filtering and other management in the Transport, its sole responsibility is the translation of connections to Peers, handing those to the caller fully setup. It's the responsibility of the caller to reject those and or keep track. Peer filtering will take place in the Switch and can be inspected in a the following commit. This changeset additionally is an exercise in clean separation of logic and other infrastructural concerns like logging and instrumentation. By leveraging a clean and minimal interface. How this looks can be seen in a follow-up change. Design #2069[2] Refs #2067[3] Fixes #2047[4] Fixes #2046[5] changes: * describe Transport interface * implement new default Transport: MultiplexTransport * test MultiplexTransport with new constraints * implement ConnSet for concurrent management of net.Conn, synchronous to PeerSet * implement and expose duplicate IP filter * implemnt TransportOption for optional parametirisation [0] https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-012-peer-transport.md [1] https://github.com/tendermint/tendermint/issues/2067 [2] https://github.com/tendermint/tendermint/pull/2069 [3] https://github.com/tendermint/tendermint/issues/2067 [4] https://github.com/tendermint/tendermint/issues/2047 [5] https://github.com/tendermint/tendermint/issues/2046 --- p2p/conn_set.go | 73 +++++ p2p/transport.go | 494 ++++++++++++++++++++++++++++++++ p2p/transport_test.go | 636 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 1203 insertions(+) create mode 100644 p2p/conn_set.go create mode 100644 p2p/transport.go create mode 100644 p2p/transport_test.go diff --git a/p2p/conn_set.go b/p2p/conn_set.go new file mode 100644 index 00000000..f960c0e8 --- /dev/null +++ b/p2p/conn_set.go @@ -0,0 +1,73 @@ +package p2p + +import ( + "net" + "sync" +) + +// ConnSet is a lookup table for connections and all their ips. +type ConnSet interface { + Has(net.Conn) bool + HasIP(net.IP) bool + Set(net.Conn, []net.IP) + Remove(net.Conn) +} + +type connSetItem struct { + conn net.Conn + ips []net.IP +} + +type connSet struct { + sync.RWMutex + + conns map[string]connSetItem +} + +// NewConnSet returns a ConnSet implementation. +func NewConnSet() *connSet { + return &connSet{ + conns: map[string]connSetItem{}, + } +} + +func (cs *connSet) Has(c net.Conn) bool { + cs.RLock() + defer cs.RUnlock() + + _, ok := cs.conns[c.RemoteAddr().String()] + + return ok +} + +func (cs *connSet) HasIP(ip net.IP) bool { + cs.RLock() + defer cs.RUnlock() + + for _, c := range cs.conns { + for _, known := range c.ips { + if known.Equal(ip) { + return true + } + } + } + + return false +} + +func (cs *connSet) Remove(c net.Conn) { + cs.Lock() + defer cs.Unlock() + + delete(cs.conns, c.RemoteAddr().String()) +} + +func (cs *connSet) Set(c net.Conn, ips []net.IP) { + cs.Lock() + defer cs.Unlock() + + cs.conns[c.RemoteAddr().String()] = connSetItem{ + conn: c, + ips: ips, + } +} diff --git a/p2p/transport.go b/p2p/transport.go new file mode 100644 index 00000000..61cff55d --- /dev/null +++ b/p2p/transport.go @@ -0,0 +1,494 @@ +package p2p + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/tendermint/tendermint/config" + crypto "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/p2p/conn" +) + +const ( + defaultDialTimeout = time.Second + defaultFilterTimeout = 5 * time.Second + defaultHandshakeTimeout = 3 * time.Second +) + +// IPResolver is a behaviour subset of net.Resolver. +type IPResolver interface { + LookupIPAddr(context.Context, string) ([]net.IPAddr, error) +} + +// accept is the container to carry the upgraded connection and NodeInfo from an +// asynchronously running routine to the Accept method. +type accept struct { + conn net.Conn + nodeInfo NodeInfo + err error +} + +// peerConfig is used to bundle data we need to fully setup a Peer with an +// MConn, provided by the caller of Accept and Dial (currently the Switch). This +// a temporary measure until reactor setup is less dynamic and we introduce the +// concept of PeerBehaviour to communicate about significant Peer lifecycle +// events. +// TODO(xla): Refactor out with more static Reactor setup and PeerBehaviour. +type peerConfig struct { + chDescs []*conn.ChannelDescriptor + onPeerError func(Peer, interface{}) + outbound, persistent bool + reactorsByCh map[byte]Reactor +} + +// Transport emits and connects to Peers. The implementation of Peer is left to +// the transport. Each transport is also responsible to filter establishing +// peers specific to its domain. +type Transport interface { + // Accept returns a newly connected Peer. + Accept(peerConfig) (Peer, error) + + // Dial connects to the Peer for the address. + Dial(NetAddress, peerConfig) (Peer, error) +} + +// transportLifecycle bundles the methods for callers to control start and stop +// behaviour. +type transportLifecycle interface { + Close() error + Listen(NetAddress) error +} + +// ConnFilterFunc to be implemented by filter hooks after a new connection has +// been established. The set of exisiting connections is passed along together +// with all resolved IPs for the new connection. +type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error + +// ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection +// and refuses new ones if they come from a known ip. +func ConnDuplicateIPFilter() ConnFilterFunc { + return func(cs ConnSet, c net.Conn, ips []net.IP) error { + for _, ip := range ips { + if cs.HasIP(ip) { + return ErrRejected{ + conn: c, + err: fmt.Errorf("IP<%v> already connected", ip), + isDuplicate: true, + } + } + } + + return nil + } +} + +// MultiplexTransportOption sets an optional parameter on the +// MultiplexTransport. +type MultiplexTransportOption func(*MultiplexTransport) + +// MultiplexTransportConnFilters sets the filters for rejection new connections. +func MultiplexTransportConnFilters( + filters ...ConnFilterFunc, +) MultiplexTransportOption { + return func(mt *MultiplexTransport) { mt.connFilters = filters } +} + +// MultiplexTransportFilterTimeout sets the timeout waited for filter calls to +// return. +func MultiplexTransportFilterTimeout( + timeout time.Duration, +) MultiplexTransportOption { + return func(mt *MultiplexTransport) { mt.filterTimeout = timeout } +} + +// MultiplexTransportResolver sets the Resolver used for ip lokkups, defaults to +// net.DefaultResolver. +func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption { + return func(mt *MultiplexTransport) { mt.resolver = resolver } +} + +// MultiplexTransport accepts and dials tcp connections and upgrades them to +// multiplexed peers. +type MultiplexTransport struct { + listener net.Listener + + acceptc chan accept + closec chan struct{} + + // Lookup table for duplicate ip and id checks. + conns ConnSet + connFilters []ConnFilterFunc + + dialTimeout time.Duration + filterTimeout time.Duration + handshakeTimeout time.Duration + nodeInfo NodeInfo + nodeKey NodeKey + resolver IPResolver + + // TODO(xla): Those configs are still needed as we parameterise peerConn and + // peer currently. All relevant configuration should be refactored into options + // with sane defaults. + mConfig conn.MConnConfig + p2pConfig config.P2PConfig +} + +// Test multiplexTransport for interface completeness. +var _ Transport = (*MultiplexTransport)(nil) +var _ transportLifecycle = (*MultiplexTransport)(nil) + +// NewMultiplexTransport returns a tcp connected multiplexed peer. +func NewMultiplexTransport( + nodeInfo NodeInfo, + nodeKey NodeKey, +) *MultiplexTransport { + return &MultiplexTransport{ + acceptc: make(chan accept), + closec: make(chan struct{}), + dialTimeout: defaultDialTimeout, + filterTimeout: defaultFilterTimeout, + handshakeTimeout: defaultHandshakeTimeout, + mConfig: conn.DefaultMConnConfig(), + nodeInfo: nodeInfo, + nodeKey: nodeKey, + conns: NewConnSet(), + resolver: net.DefaultResolver, + } +} + +// Accept implements Transport. +func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) { + select { + // This case should never have any side-effectful/blocking operations to + // ensure that quality peers are ready to be used. + case a := <-mt.acceptc: + if a.err != nil { + return nil, a.err + } + + cfg.outbound = false + + return mt.wrapPeer(a.conn, a.nodeInfo, cfg), nil + case <-mt.closec: + return nil, &ErrTransportClosed{} + } +} + +// Dial implements Transport. +func (mt *MultiplexTransport) Dial( + addr NetAddress, + cfg peerConfig, +) (Peer, error) { + c, err := addr.DialTimeout(mt.dialTimeout) + if err != nil { + return nil, err + } + + // TODO(xla): Evaluate if we should apply filters if we explicitly dial. + if err := mt.filterConn(c); err != nil { + return nil, err + } + + secretConn, nodeInfo, err := mt.upgrade(c) + if err != nil { + return nil, err + } + + cfg.outbound = true + + p := mt.wrapPeer(secretConn, nodeInfo, cfg) + + return p, nil +} + +// Close implements transportLifecycle. +func (mt *MultiplexTransport) Close() error { + close(mt.closec) + + return mt.listener.Close() +} + +// Listen implements transportLifecycle. +func (mt *MultiplexTransport) Listen(addr NetAddress) error { + ln, err := net.Listen("tcp", addr.DialString()) + if err != nil { + return err + } + + mt.listener = ln + + go mt.acceptPeers() + + return nil +} + +func (mt *MultiplexTransport) acceptPeers() { + for { + c, err := mt.listener.Accept() + if err != nil { + // If Close() has been called, silently exit. + select { + case _, ok := <-mt.closec: + if !ok { + return + } + default: + // Transport is not closed + } + + mt.acceptc <- accept{err: err} + return + } + + // Connection upgrade and filtering should be asynchronous to avoid + // Head-of-line blocking[0]. + // Reference: https://github.com/tendermint/tendermint/issues/2047 + // + // [0] https://en.wikipedia.org/wiki/Head-of-line_blocking + go func(c net.Conn) { + var ( + nodeInfo NodeInfo + secretConn *conn.SecretConnection + ) + + err := mt.filterConn(c) + if err == nil { + secretConn, nodeInfo, err = mt.upgrade(c) + } + + select { + case mt.acceptc <- accept{secretConn, nodeInfo, err}: + // Make the upgraded peer available. + case <-mt.closec: + // Give up if the transport was closed. + _ = c.Close() + return + } + }(c) + } +} + +func (mt *MultiplexTransport) cleanup(c net.Conn) error { + mt.conns.Remove(c) + + return c.Close() +} + +func (mt *MultiplexTransport) filterConn(c net.Conn) (err error) { + defer func() { + if err != nil { + _ = c.Close() + } + }() + + // Reject if connection is already present. + if mt.conns.Has(c) { + return ErrRejected{conn: c, isDuplicate: true} + } + + // Resolve ips for incoming conn. + ips, err := resolveIPs(mt.resolver, c) + if err != nil { + return err + } + + errc := make(chan error, len(mt.connFilters)) + + for _, f := range mt.connFilters { + go func(f ConnFilterFunc, c net.Conn, ips []net.IP, errc chan<- error) { + errc <- f(mt.conns, c, ips) + }(f, c, ips, errc) + } + + for i := 0; i < cap(errc); i++ { + select { + case err := <-errc: + if err != nil { + return ErrRejected{conn: c, err: err, isFiltered: true} + } + case <-time.After(mt.filterTimeout): + return ErrFilterTimeout{} + } + + } + + mt.conns.Set(c, ips) + + return nil +} + +func (mt *MultiplexTransport) upgrade( + c net.Conn, +) (secretConn *conn.SecretConnection, nodeInfo NodeInfo, err error) { + defer func() { + if err != nil { + _ = mt.cleanup(c) + } + }() + + secretConn, err = upgradeSecretConn(c, mt.handshakeTimeout, mt.nodeKey.PrivKey) + if err != nil { + return nil, NodeInfo{}, ErrRejected{ + conn: c, + err: fmt.Errorf("secrect conn failed: %v", err), + isAuthFailure: true, + } + } + + nodeInfo, err = handshake(secretConn, mt.handshakeTimeout, mt.nodeInfo) + if err != nil { + return nil, NodeInfo{}, ErrRejected{ + conn: c, + err: fmt.Errorf("handshake failed: %v", err), + isAuthFailure: true, + } + } + + if err := nodeInfo.Validate(); err != nil { + return nil, NodeInfo{}, ErrRejected{ + conn: c, + err: err, + isNodeInfoInvalid: true, + } + } + + // Ensure connection key matches self reported key. + if connID := PubKeyToID(secretConn.RemotePubKey()); connID != nodeInfo.ID { + return nil, NodeInfo{}, ErrRejected{ + conn: c, + id: connID, + err: fmt.Errorf( + "conn.ID (%v) NodeInfo.ID (%v) missmatch", + connID, + nodeInfo.ID, + ), + isAuthFailure: true, + } + } + + // Reject self. + if mt.nodeInfo.ID == nodeInfo.ID { + return nil, NodeInfo{}, ErrRejected{ + addr: *NewNetAddress(nodeInfo.ID, c.RemoteAddr()), + conn: c, + id: nodeInfo.ID, + isSelf: true, + } + } + + if err := mt.nodeInfo.CompatibleWith(nodeInfo); err != nil { + return nil, NodeInfo{}, ErrRejected{ + conn: c, + err: err, + id: nodeInfo.ID, + isIncompatible: true, + } + } + + return secretConn, nodeInfo, nil +} + +func (mt *MultiplexTransport) wrapPeer( + c net.Conn, + ni NodeInfo, + cfg peerConfig, +) Peer { + p := newPeer( + peerConn{ + conn: c, + config: &mt.p2pConfig, + outbound: cfg.outbound, + persistent: cfg.persistent, + }, + mt.mConfig, + ni, + cfg.reactorsByCh, + cfg.chDescs, + cfg.onPeerError, + ) + + // Wait for Peer to Stop so we can cleanup. + go func(c net.Conn) { + <-p.Quit() + _ = mt.cleanup(c) + }(c) + + return p +} + +func handshake( + c net.Conn, + timeout time.Duration, + nodeInfo NodeInfo, +) (NodeInfo, error) { + if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { + return NodeInfo{}, err + } + + var ( + errc = make(chan error, 2) + + peerNodeInfo NodeInfo + ) + + go func(errc chan<- error, c net.Conn) { + _, err := cdc.MarshalBinaryWriter(c, nodeInfo) + errc <- err + }(errc, c) + go func(errc chan<- error, c net.Conn) { + _, err := cdc.UnmarshalBinaryReader( + c, + &peerNodeInfo, + int64(MaxNodeInfoSize()), + ) + errc <- err + }(errc, c) + + for i := 0; i < cap(errc); i++ { + err := <-errc + if err != nil { + return NodeInfo{}, err + } + } + + return peerNodeInfo, c.SetDeadline(time.Time{}) +} + +func upgradeSecretConn( + c net.Conn, + timeout time.Duration, + privKey crypto.PrivKey, +) (*conn.SecretConnection, error) { + if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { + return nil, err + } + + sc, err := conn.MakeSecretConnection(c, privKey) + if err != nil { + return nil, err + } + + return sc, sc.SetDeadline(time.Time{}) +} + +func resolveIPs(resolver IPResolver, c net.Conn) ([]net.IP, error) { + host, _, err := net.SplitHostPort(c.RemoteAddr().String()) + if err != nil { + return nil, err + } + + addrs, err := resolver.LookupIPAddr(context.Background(), host) + if err != nil { + return nil, err + } + + ips := []net.IP{} + + for _, addr := range addrs { + ips = append(ips, addr.IP) + } + + return ips, nil +} diff --git a/p2p/transport_test.go b/p2p/transport_test.go new file mode 100644 index 00000000..9e3cc467 --- /dev/null +++ b/p2p/transport_test.go @@ -0,0 +1,636 @@ +package p2p + +import ( + "fmt" + "math/rand" + "net" + "reflect" + "testing" + "time" + + "github.com/tendermint/tendermint/crypto/ed25519" +) + +func TestTransportMultiplexConnFilter(t *testing.T) { + mt := NewMultiplexTransport( + NodeInfo{}, + NodeKey{ + PrivKey: ed25519.GenPrivKey(), + }, + ) + + MultiplexTransportConnFilters( + func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil }, + func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil }, + func(_ ConnSet, _ net.Conn, _ []net.IP) error { + return fmt.Errorf("rejected") + }, + )(mt) + + addr, err := NewNetAddressStringWithOptionalID("127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + if err := mt.Listen(*addr); err != nil { + t.Fatal(err) + } + + errc := make(chan error) + + go func() { + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = addr.Dial() + if err != nil { + errc <- err + return + } + + close(errc) + }() + + if err := <-errc; err != nil { + t.Errorf("connection failed: %v", err) + } + + _, err = mt.Accept(peerConfig{}) + if err, ok := err.(ErrRejected); ok { + if !err.IsFiltered() { + t.Errorf("expected peer to be filtered") + } + } else { + t.Errorf("expected ErrRejected") + } +} + +func TestTransportMultiplexConnFilterTimeout(t *testing.T) { + mt := NewMultiplexTransport( + NodeInfo{}, + NodeKey{ + PrivKey: ed25519.GenPrivKey(), + }, + ) + + MultiplexTransportFilterTimeout(5 * time.Millisecond)(mt) + MultiplexTransportConnFilters( + func(_ ConnSet, _ net.Conn, _ []net.IP) error { + time.Sleep(10 * time.Millisecond) + return nil + }, + )(mt) + + addr, err := NewNetAddressStringWithOptionalID("127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + if err := mt.Listen(*addr); err != nil { + t.Fatal(err) + } + + errc := make(chan error) + + go func() { + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = addr.Dial() + if err != nil { + errc <- err + return + } + + close(errc) + }() + + if err := <-errc; err != nil { + t.Errorf("connection failed: %v", err) + } + + _, err = mt.Accept(peerConfig{}) + if _, ok := err.(ErrFilterTimeout); !ok { + t.Errorf("expected ErrFilterTimeout") + } +} +func TestTransportMultiplexAcceptMultiple(t *testing.T) { + mt := testSetupMultiplexTransport(t) + + var ( + seed = rand.New(rand.NewSource(time.Now().UnixNano())) + errc = make(chan error, seed.Intn(64)+64) + ) + + // Setup dialers. + for i := 0; i < cap(errc); i++ { + go func() { + var ( + pv = ed25519.GenPrivKey() + dialer = NewMultiplexTransport( + NodeInfo{ + ID: PubKeyToID(pv.PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "dialer", + Version: "1.0.0", + }, + NodeKey{ + PrivKey: pv, + }, + ) + ) + + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = dialer.Dial(*addr, peerConfig{}) + if err != nil { + errc <- err + return + } + + // Signal that the connection was established. + errc <- nil + }() + } + + // Catch connection errors. + for i := 0; i < cap(errc); i++ { + if err := <-errc; err != nil { + t.Fatal(err) + } + } + + ps := []Peer{} + + // Accept all peers. + for i := 0; i < cap(errc); i++ { + p, err := mt.Accept(peerConfig{}) + if err != nil { + t.Fatal(err) + } + + if err := p.Start(); err != nil { + t.Fatal(err) + } + + ps = append(ps, p) + } + + if have, want := len(ps), cap(errc); have != want { + t.Errorf("have %v, want %v", have, want) + } + + // Stop all peers. + for _, p := range ps { + if err := p.Stop(); err != nil { + t.Fatal(err) + } + } + + if err := mt.Close(); err != nil { + t.Errorf("close errored: %v", err) + } +} + +func TestTransportMultiplexAcceptNonBlocking(t *testing.T) { + mt := testSetupMultiplexTransport(t) + + var ( + fastNodePV = ed25519.GenPrivKey() + fastNodeInfo = NodeInfo{ + ID: PubKeyToID(fastNodePV.PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "fastNode", + Version: "1.0.0", + } + errc = make(chan error) + fastc = make(chan struct{}) + slowc = make(chan struct{}) + ) + + // Simulate slow Peer. + go func() { + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + c, err := addr.Dial() + if err != nil { + errc <- err + return + } + + close(slowc) + + select { + case <-fastc: + // Fast peer connected. + case <-time.After(50 * time.Millisecond): + // We error if the fast peer didn't succeed. + errc <- fmt.Errorf("Fast peer timed out") + } + + sc, err := upgradeSecretConn(c, 20*time.Millisecond, ed25519.GenPrivKey()) + if err != nil { + errc <- err + return + } + + _, err = handshake(sc, 20*time.Millisecond, NodeInfo{ + ID: PubKeyToID(ed25519.GenPrivKey().PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "slow_peer", + }) + if err != nil { + errc <- err + return + } + }() + + // Simulate fast Peer. + go func() { + <-slowc + + var ( + dialer = NewMultiplexTransport( + fastNodeInfo, + NodeKey{ + PrivKey: fastNodePV, + }, + ) + ) + + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = dialer.Dial(*addr, peerConfig{}) + if err != nil { + errc <- err + return + } + + close(errc) + close(fastc) + }() + + if err := <-errc; err != nil { + t.Errorf("connection failed: %v", err) + } + + p, err := mt.Accept(peerConfig{}) + if err != nil { + t.Fatal(err) + } + + if have, want := p.NodeInfo(), fastNodeInfo; !reflect.DeepEqual(have, want) { + t.Errorf("have %v, want %v", have, want) + } +} + +func TestTransportMultiplexValidateNodeInfo(t *testing.T) { + mt := testSetupMultiplexTransport(t) + + errc := make(chan error) + + go func() { + var ( + pv = ed25519.GenPrivKey() + dialer = NewMultiplexTransport( + NodeInfo{ + ID: PubKeyToID(pv.PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "", // Should not be empty. + Version: "1.0.0", + }, + NodeKey{ + PrivKey: pv, + }, + ) + ) + + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = dialer.Dial(*addr, peerConfig{}) + if err != nil { + errc <- err + return + } + + close(errc) + }() + + if err := <-errc; err != nil { + t.Errorf("connection failed: %v", err) + } + + _, err := mt.Accept(peerConfig{}) + if err, ok := err.(ErrRejected); ok { + if !err.IsNodeInfoInvalid() { + t.Errorf("expected NodeInfo to be invalid") + } + } else { + t.Errorf("expected ErrRejected") + } +} + +func TestTransportMultiplexRejectMissmatchID(t *testing.T) { + mt := testSetupMultiplexTransport(t) + + errc := make(chan error) + + go func() { + dialer := NewMultiplexTransport( + NodeInfo{ + ID: PubKeyToID(ed25519.GenPrivKey().PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "dialer", + Version: "1.0.0", + }, + NodeKey{ + PrivKey: ed25519.GenPrivKey(), + }, + ) + + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = dialer.Dial(*addr, peerConfig{}) + if err != nil { + errc <- err + return + } + + close(errc) + }() + + if err := <-errc; err != nil { + t.Errorf("connection failed: %v", err) + } + + _, err := mt.Accept(peerConfig{}) + if err, ok := err.(ErrRejected); ok { + if !err.IsAuthFailure() { + t.Errorf("expected auth failure") + } + } else { + t.Errorf("expected ErrRejected") + } +} + +func TestTransportMultiplexRejectIncompatible(t *testing.T) { + mt := testSetupMultiplexTransport(t) + + errc := make(chan error) + + go func() { + var ( + pv = ed25519.GenPrivKey() + dialer = NewMultiplexTransport( + NodeInfo{ + ID: PubKeyToID(pv.PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "dialer", + Version: "2.0.0", + }, + NodeKey{ + PrivKey: pv, + }, + ) + ) + + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = dialer.Dial(*addr, peerConfig{}) + if err != nil { + errc <- err + return + } + + close(errc) + }() + + _, err := mt.Accept(peerConfig{}) + if err, ok := err.(ErrRejected); ok { + if !err.IsIncompatible() { + t.Errorf("expected to reject incompatible") + } + } else { + t.Errorf("expected ErrRejected") + } +} + +func TestTransportMultiplexRejectSelf(t *testing.T) { + mt := testSetupMultiplexTransport(t) + + errc := make(chan error) + + go func() { + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = mt.Dial(*addr, peerConfig{}) + if err != nil { + errc <- err + return + } + + close(errc) + }() + + if err := <-errc; err != nil { + if err, ok := err.(ErrRejected); ok { + if !err.IsSelf() { + t.Errorf("expected to reject self") + } + } else { + t.Errorf("expected ErrRejected") + } + } else { + t.Errorf("expected connection failure") + } + + _, err := mt.Accept(peerConfig{}) + if err, ok := err.(ErrRejected); ok { + if !err.IsSelf() { + t.Errorf("expected to reject self") + } + } else { + t.Errorf("expected ErrRejected") + } +} + +func TestTransportConnDuplicateIPFilter(t *testing.T) { + filter := ConnDuplicateIPFilter() + + if err := filter(nil, &testTransportConn{}, nil); err != nil { + t.Fatal(err) + } + + var ( + c = &testTransportConn{} + cs = NewConnSet() + ) + + cs.Set(c, []net.IP{ + net.IP{10, 0, 10, 1}, + net.IP{10, 0, 10, 2}, + net.IP{10, 0, 10, 3}, + }) + + if err := filter(cs, c, []net.IP{ + net.IP{10, 0, 10, 2}, + }); err == nil { + t.Errorf("expected Peer to be rejected as duplicate") + } +} + +func TestTransportHandshake(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + var ( + peerPV = ed25519.GenPrivKey() + peerNodeInfo = NodeInfo{ + ID: PubKeyToID(peerPV.PubKey()), + } + ) + + go func() { + c, err := net.Dial(ln.Addr().Network(), ln.Addr().String()) + if err != nil { + t.Error(err) + return + } + + go func(c net.Conn) { + _, err := cdc.MarshalBinaryWriter(c, peerNodeInfo) + if err != nil { + t.Error(err) + } + }(c) + go func(c net.Conn) { + ni := NodeInfo{} + + _, err := cdc.UnmarshalBinaryReader( + c, + &ni, + int64(MaxNodeInfoSize()), + ) + if err != nil { + t.Error(err) + } + }(c) + }() + + c, err := ln.Accept() + if err != nil { + t.Fatal(err) + } + + ni, err := handshake(c, 20*time.Millisecond, NodeInfo{}) + if err != nil { + t.Fatal(err) + } + + if have, want := ni, peerNodeInfo; !reflect.DeepEqual(have, want) { + t.Errorf("have %v, want %v", have, want) + } +} + +func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport { + var ( + pv = ed25519.GenPrivKey() + mt = NewMultiplexTransport( + NodeInfo{ + ID: PubKeyToID(pv.PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "transport", + Version: "1.0.0", + }, + NodeKey{ + PrivKey: pv, + }, + ) + ) + + addr, err := NewNetAddressStringWithOptionalID("127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + if err := mt.Listen(*addr); err != nil { + t.Fatal(err) + } + + return mt +} + +type testTransportAddr struct{} + +func (a *testTransportAddr) Network() string { return "tcp" } +func (a *testTransportAddr) String() string { return "test.local:1234" } + +type testTransportConn struct{} + +func (c *testTransportConn) Close() error { + return fmt.Errorf("Close() not implemented") +} + +func (c *testTransportConn) LocalAddr() net.Addr { + return &testTransportAddr{} +} + +func (c *testTransportConn) RemoteAddr() net.Addr { + return &testTransportAddr{} +} + +func (c *testTransportConn) Read(_ []byte) (int, error) { + return -1, fmt.Errorf("Read() not implemented") +} + +func (c *testTransportConn) SetDeadline(_ time.Time) error { + return fmt.Errorf("SetDeadline() not implemented") +} + +func (c *testTransportConn) SetReadDeadline(_ time.Time) error { + return fmt.Errorf("SetReadDeadline() not implemented") +} + +func (c *testTransportConn) SetWriteDeadline(_ time.Time) error { + return fmt.Errorf("SetWriteDeadline() not implemented") +} + +func (c *testTransportConn) Write(_ []byte) (int, error) { + return -1, fmt.Errorf("Write() not implemented") +}