diff --git a/p2p/conn_set.go b/p2p/conn_set.go new file mode 100644 index 00000000..f960c0e8 --- /dev/null +++ b/p2p/conn_set.go @@ -0,0 +1,73 @@ +package p2p + +import ( + "net" + "sync" +) + +// ConnSet is a lookup table for connections and all their ips. +type ConnSet interface { + Has(net.Conn) bool + HasIP(net.IP) bool + Set(net.Conn, []net.IP) + Remove(net.Conn) +} + +type connSetItem struct { + conn net.Conn + ips []net.IP +} + +type connSet struct { + sync.RWMutex + + conns map[string]connSetItem +} + +// NewConnSet returns a ConnSet implementation. +func NewConnSet() *connSet { + return &connSet{ + conns: map[string]connSetItem{}, + } +} + +func (cs *connSet) Has(c net.Conn) bool { + cs.RLock() + defer cs.RUnlock() + + _, ok := cs.conns[c.RemoteAddr().String()] + + return ok +} + +func (cs *connSet) HasIP(ip net.IP) bool { + cs.RLock() + defer cs.RUnlock() + + for _, c := range cs.conns { + for _, known := range c.ips { + if known.Equal(ip) { + return true + } + } + } + + return false +} + +func (cs *connSet) Remove(c net.Conn) { + cs.Lock() + defer cs.Unlock() + + delete(cs.conns, c.RemoteAddr().String()) +} + +func (cs *connSet) Set(c net.Conn, ips []net.IP) { + cs.Lock() + defer cs.Unlock() + + cs.conns[c.RemoteAddr().String()] = connSetItem{ + conn: c, + ips: ips, + } +} diff --git a/p2p/transport.go b/p2p/transport.go new file mode 100644 index 00000000..61cff55d --- /dev/null +++ b/p2p/transport.go @@ -0,0 +1,494 @@ +package p2p + +import ( + "context" + "fmt" + "net" + "time" + + "github.com/tendermint/tendermint/config" + crypto "github.com/tendermint/tendermint/crypto" + "github.com/tendermint/tendermint/p2p/conn" +) + +const ( + defaultDialTimeout = time.Second + defaultFilterTimeout = 5 * time.Second + defaultHandshakeTimeout = 3 * time.Second +) + +// IPResolver is a behaviour subset of net.Resolver. +type IPResolver interface { + LookupIPAddr(context.Context, string) ([]net.IPAddr, error) +} + +// accept is the container to carry the upgraded connection and NodeInfo from an +// asynchronously running routine to the Accept method. +type accept struct { + conn net.Conn + nodeInfo NodeInfo + err error +} + +// peerConfig is used to bundle data we need to fully setup a Peer with an +// MConn, provided by the caller of Accept and Dial (currently the Switch). This +// a temporary measure until reactor setup is less dynamic and we introduce the +// concept of PeerBehaviour to communicate about significant Peer lifecycle +// events. +// TODO(xla): Refactor out with more static Reactor setup and PeerBehaviour. +type peerConfig struct { + chDescs []*conn.ChannelDescriptor + onPeerError func(Peer, interface{}) + outbound, persistent bool + reactorsByCh map[byte]Reactor +} + +// Transport emits and connects to Peers. The implementation of Peer is left to +// the transport. Each transport is also responsible to filter establishing +// peers specific to its domain. +type Transport interface { + // Accept returns a newly connected Peer. + Accept(peerConfig) (Peer, error) + + // Dial connects to the Peer for the address. + Dial(NetAddress, peerConfig) (Peer, error) +} + +// transportLifecycle bundles the methods for callers to control start and stop +// behaviour. +type transportLifecycle interface { + Close() error + Listen(NetAddress) error +} + +// ConnFilterFunc to be implemented by filter hooks after a new connection has +// been established. The set of exisiting connections is passed along together +// with all resolved IPs for the new connection. +type ConnFilterFunc func(ConnSet, net.Conn, []net.IP) error + +// ConnDuplicateIPFilter resolves and keeps all ips for an incoming connection +// and refuses new ones if they come from a known ip. +func ConnDuplicateIPFilter() ConnFilterFunc { + return func(cs ConnSet, c net.Conn, ips []net.IP) error { + for _, ip := range ips { + if cs.HasIP(ip) { + return ErrRejected{ + conn: c, + err: fmt.Errorf("IP<%v> already connected", ip), + isDuplicate: true, + } + } + } + + return nil + } +} + +// MultiplexTransportOption sets an optional parameter on the +// MultiplexTransport. +type MultiplexTransportOption func(*MultiplexTransport) + +// MultiplexTransportConnFilters sets the filters for rejection new connections. +func MultiplexTransportConnFilters( + filters ...ConnFilterFunc, +) MultiplexTransportOption { + return func(mt *MultiplexTransport) { mt.connFilters = filters } +} + +// MultiplexTransportFilterTimeout sets the timeout waited for filter calls to +// return. +func MultiplexTransportFilterTimeout( + timeout time.Duration, +) MultiplexTransportOption { + return func(mt *MultiplexTransport) { mt.filterTimeout = timeout } +} + +// MultiplexTransportResolver sets the Resolver used for ip lokkups, defaults to +// net.DefaultResolver. +func MultiplexTransportResolver(resolver IPResolver) MultiplexTransportOption { + return func(mt *MultiplexTransport) { mt.resolver = resolver } +} + +// MultiplexTransport accepts and dials tcp connections and upgrades them to +// multiplexed peers. +type MultiplexTransport struct { + listener net.Listener + + acceptc chan accept + closec chan struct{} + + // Lookup table for duplicate ip and id checks. + conns ConnSet + connFilters []ConnFilterFunc + + dialTimeout time.Duration + filterTimeout time.Duration + handshakeTimeout time.Duration + nodeInfo NodeInfo + nodeKey NodeKey + resolver IPResolver + + // TODO(xla): Those configs are still needed as we parameterise peerConn and + // peer currently. All relevant configuration should be refactored into options + // with sane defaults. + mConfig conn.MConnConfig + p2pConfig config.P2PConfig +} + +// Test multiplexTransport for interface completeness. +var _ Transport = (*MultiplexTransport)(nil) +var _ transportLifecycle = (*MultiplexTransport)(nil) + +// NewMultiplexTransport returns a tcp connected multiplexed peer. +func NewMultiplexTransport( + nodeInfo NodeInfo, + nodeKey NodeKey, +) *MultiplexTransport { + return &MultiplexTransport{ + acceptc: make(chan accept), + closec: make(chan struct{}), + dialTimeout: defaultDialTimeout, + filterTimeout: defaultFilterTimeout, + handshakeTimeout: defaultHandshakeTimeout, + mConfig: conn.DefaultMConnConfig(), + nodeInfo: nodeInfo, + nodeKey: nodeKey, + conns: NewConnSet(), + resolver: net.DefaultResolver, + } +} + +// Accept implements Transport. +func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) { + select { + // This case should never have any side-effectful/blocking operations to + // ensure that quality peers are ready to be used. + case a := <-mt.acceptc: + if a.err != nil { + return nil, a.err + } + + cfg.outbound = false + + return mt.wrapPeer(a.conn, a.nodeInfo, cfg), nil + case <-mt.closec: + return nil, &ErrTransportClosed{} + } +} + +// Dial implements Transport. +func (mt *MultiplexTransport) Dial( + addr NetAddress, + cfg peerConfig, +) (Peer, error) { + c, err := addr.DialTimeout(mt.dialTimeout) + if err != nil { + return nil, err + } + + // TODO(xla): Evaluate if we should apply filters if we explicitly dial. + if err := mt.filterConn(c); err != nil { + return nil, err + } + + secretConn, nodeInfo, err := mt.upgrade(c) + if err != nil { + return nil, err + } + + cfg.outbound = true + + p := mt.wrapPeer(secretConn, nodeInfo, cfg) + + return p, nil +} + +// Close implements transportLifecycle. +func (mt *MultiplexTransport) Close() error { + close(mt.closec) + + return mt.listener.Close() +} + +// Listen implements transportLifecycle. +func (mt *MultiplexTransport) Listen(addr NetAddress) error { + ln, err := net.Listen("tcp", addr.DialString()) + if err != nil { + return err + } + + mt.listener = ln + + go mt.acceptPeers() + + return nil +} + +func (mt *MultiplexTransport) acceptPeers() { + for { + c, err := mt.listener.Accept() + if err != nil { + // If Close() has been called, silently exit. + select { + case _, ok := <-mt.closec: + if !ok { + return + } + default: + // Transport is not closed + } + + mt.acceptc <- accept{err: err} + return + } + + // Connection upgrade and filtering should be asynchronous to avoid + // Head-of-line blocking[0]. + // Reference: https://github.com/tendermint/tendermint/issues/2047 + // + // [0] https://en.wikipedia.org/wiki/Head-of-line_blocking + go func(c net.Conn) { + var ( + nodeInfo NodeInfo + secretConn *conn.SecretConnection + ) + + err := mt.filterConn(c) + if err == nil { + secretConn, nodeInfo, err = mt.upgrade(c) + } + + select { + case mt.acceptc <- accept{secretConn, nodeInfo, err}: + // Make the upgraded peer available. + case <-mt.closec: + // Give up if the transport was closed. + _ = c.Close() + return + } + }(c) + } +} + +func (mt *MultiplexTransport) cleanup(c net.Conn) error { + mt.conns.Remove(c) + + return c.Close() +} + +func (mt *MultiplexTransport) filterConn(c net.Conn) (err error) { + defer func() { + if err != nil { + _ = c.Close() + } + }() + + // Reject if connection is already present. + if mt.conns.Has(c) { + return ErrRejected{conn: c, isDuplicate: true} + } + + // Resolve ips for incoming conn. + ips, err := resolveIPs(mt.resolver, c) + if err != nil { + return err + } + + errc := make(chan error, len(mt.connFilters)) + + for _, f := range mt.connFilters { + go func(f ConnFilterFunc, c net.Conn, ips []net.IP, errc chan<- error) { + errc <- f(mt.conns, c, ips) + }(f, c, ips, errc) + } + + for i := 0; i < cap(errc); i++ { + select { + case err := <-errc: + if err != nil { + return ErrRejected{conn: c, err: err, isFiltered: true} + } + case <-time.After(mt.filterTimeout): + return ErrFilterTimeout{} + } + + } + + mt.conns.Set(c, ips) + + return nil +} + +func (mt *MultiplexTransport) upgrade( + c net.Conn, +) (secretConn *conn.SecretConnection, nodeInfo NodeInfo, err error) { + defer func() { + if err != nil { + _ = mt.cleanup(c) + } + }() + + secretConn, err = upgradeSecretConn(c, mt.handshakeTimeout, mt.nodeKey.PrivKey) + if err != nil { + return nil, NodeInfo{}, ErrRejected{ + conn: c, + err: fmt.Errorf("secrect conn failed: %v", err), + isAuthFailure: true, + } + } + + nodeInfo, err = handshake(secretConn, mt.handshakeTimeout, mt.nodeInfo) + if err != nil { + return nil, NodeInfo{}, ErrRejected{ + conn: c, + err: fmt.Errorf("handshake failed: %v", err), + isAuthFailure: true, + } + } + + if err := nodeInfo.Validate(); err != nil { + return nil, NodeInfo{}, ErrRejected{ + conn: c, + err: err, + isNodeInfoInvalid: true, + } + } + + // Ensure connection key matches self reported key. + if connID := PubKeyToID(secretConn.RemotePubKey()); connID != nodeInfo.ID { + return nil, NodeInfo{}, ErrRejected{ + conn: c, + id: connID, + err: fmt.Errorf( + "conn.ID (%v) NodeInfo.ID (%v) missmatch", + connID, + nodeInfo.ID, + ), + isAuthFailure: true, + } + } + + // Reject self. + if mt.nodeInfo.ID == nodeInfo.ID { + return nil, NodeInfo{}, ErrRejected{ + addr: *NewNetAddress(nodeInfo.ID, c.RemoteAddr()), + conn: c, + id: nodeInfo.ID, + isSelf: true, + } + } + + if err := mt.nodeInfo.CompatibleWith(nodeInfo); err != nil { + return nil, NodeInfo{}, ErrRejected{ + conn: c, + err: err, + id: nodeInfo.ID, + isIncompatible: true, + } + } + + return secretConn, nodeInfo, nil +} + +func (mt *MultiplexTransport) wrapPeer( + c net.Conn, + ni NodeInfo, + cfg peerConfig, +) Peer { + p := newPeer( + peerConn{ + conn: c, + config: &mt.p2pConfig, + outbound: cfg.outbound, + persistent: cfg.persistent, + }, + mt.mConfig, + ni, + cfg.reactorsByCh, + cfg.chDescs, + cfg.onPeerError, + ) + + // Wait for Peer to Stop so we can cleanup. + go func(c net.Conn) { + <-p.Quit() + _ = mt.cleanup(c) + }(c) + + return p +} + +func handshake( + c net.Conn, + timeout time.Duration, + nodeInfo NodeInfo, +) (NodeInfo, error) { + if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { + return NodeInfo{}, err + } + + var ( + errc = make(chan error, 2) + + peerNodeInfo NodeInfo + ) + + go func(errc chan<- error, c net.Conn) { + _, err := cdc.MarshalBinaryWriter(c, nodeInfo) + errc <- err + }(errc, c) + go func(errc chan<- error, c net.Conn) { + _, err := cdc.UnmarshalBinaryReader( + c, + &peerNodeInfo, + int64(MaxNodeInfoSize()), + ) + errc <- err + }(errc, c) + + for i := 0; i < cap(errc); i++ { + err := <-errc + if err != nil { + return NodeInfo{}, err + } + } + + return peerNodeInfo, c.SetDeadline(time.Time{}) +} + +func upgradeSecretConn( + c net.Conn, + timeout time.Duration, + privKey crypto.PrivKey, +) (*conn.SecretConnection, error) { + if err := c.SetDeadline(time.Now().Add(timeout)); err != nil { + return nil, err + } + + sc, err := conn.MakeSecretConnection(c, privKey) + if err != nil { + return nil, err + } + + return sc, sc.SetDeadline(time.Time{}) +} + +func resolveIPs(resolver IPResolver, c net.Conn) ([]net.IP, error) { + host, _, err := net.SplitHostPort(c.RemoteAddr().String()) + if err != nil { + return nil, err + } + + addrs, err := resolver.LookupIPAddr(context.Background(), host) + if err != nil { + return nil, err + } + + ips := []net.IP{} + + for _, addr := range addrs { + ips = append(ips, addr.IP) + } + + return ips, nil +} diff --git a/p2p/transport_test.go b/p2p/transport_test.go new file mode 100644 index 00000000..9e3cc467 --- /dev/null +++ b/p2p/transport_test.go @@ -0,0 +1,636 @@ +package p2p + +import ( + "fmt" + "math/rand" + "net" + "reflect" + "testing" + "time" + + "github.com/tendermint/tendermint/crypto/ed25519" +) + +func TestTransportMultiplexConnFilter(t *testing.T) { + mt := NewMultiplexTransport( + NodeInfo{}, + NodeKey{ + PrivKey: ed25519.GenPrivKey(), + }, + ) + + MultiplexTransportConnFilters( + func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil }, + func(_ ConnSet, _ net.Conn, _ []net.IP) error { return nil }, + func(_ ConnSet, _ net.Conn, _ []net.IP) error { + return fmt.Errorf("rejected") + }, + )(mt) + + addr, err := NewNetAddressStringWithOptionalID("127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + if err := mt.Listen(*addr); err != nil { + t.Fatal(err) + } + + errc := make(chan error) + + go func() { + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = addr.Dial() + if err != nil { + errc <- err + return + } + + close(errc) + }() + + if err := <-errc; err != nil { + t.Errorf("connection failed: %v", err) + } + + _, err = mt.Accept(peerConfig{}) + if err, ok := err.(ErrRejected); ok { + if !err.IsFiltered() { + t.Errorf("expected peer to be filtered") + } + } else { + t.Errorf("expected ErrRejected") + } +} + +func TestTransportMultiplexConnFilterTimeout(t *testing.T) { + mt := NewMultiplexTransport( + NodeInfo{}, + NodeKey{ + PrivKey: ed25519.GenPrivKey(), + }, + ) + + MultiplexTransportFilterTimeout(5 * time.Millisecond)(mt) + MultiplexTransportConnFilters( + func(_ ConnSet, _ net.Conn, _ []net.IP) error { + time.Sleep(10 * time.Millisecond) + return nil + }, + )(mt) + + addr, err := NewNetAddressStringWithOptionalID("127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + if err := mt.Listen(*addr); err != nil { + t.Fatal(err) + } + + errc := make(chan error) + + go func() { + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = addr.Dial() + if err != nil { + errc <- err + return + } + + close(errc) + }() + + if err := <-errc; err != nil { + t.Errorf("connection failed: %v", err) + } + + _, err = mt.Accept(peerConfig{}) + if _, ok := err.(ErrFilterTimeout); !ok { + t.Errorf("expected ErrFilterTimeout") + } +} +func TestTransportMultiplexAcceptMultiple(t *testing.T) { + mt := testSetupMultiplexTransport(t) + + var ( + seed = rand.New(rand.NewSource(time.Now().UnixNano())) + errc = make(chan error, seed.Intn(64)+64) + ) + + // Setup dialers. + for i := 0; i < cap(errc); i++ { + go func() { + var ( + pv = ed25519.GenPrivKey() + dialer = NewMultiplexTransport( + NodeInfo{ + ID: PubKeyToID(pv.PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "dialer", + Version: "1.0.0", + }, + NodeKey{ + PrivKey: pv, + }, + ) + ) + + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = dialer.Dial(*addr, peerConfig{}) + if err != nil { + errc <- err + return + } + + // Signal that the connection was established. + errc <- nil + }() + } + + // Catch connection errors. + for i := 0; i < cap(errc); i++ { + if err := <-errc; err != nil { + t.Fatal(err) + } + } + + ps := []Peer{} + + // Accept all peers. + for i := 0; i < cap(errc); i++ { + p, err := mt.Accept(peerConfig{}) + if err != nil { + t.Fatal(err) + } + + if err := p.Start(); err != nil { + t.Fatal(err) + } + + ps = append(ps, p) + } + + if have, want := len(ps), cap(errc); have != want { + t.Errorf("have %v, want %v", have, want) + } + + // Stop all peers. + for _, p := range ps { + if err := p.Stop(); err != nil { + t.Fatal(err) + } + } + + if err := mt.Close(); err != nil { + t.Errorf("close errored: %v", err) + } +} + +func TestTransportMultiplexAcceptNonBlocking(t *testing.T) { + mt := testSetupMultiplexTransport(t) + + var ( + fastNodePV = ed25519.GenPrivKey() + fastNodeInfo = NodeInfo{ + ID: PubKeyToID(fastNodePV.PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "fastNode", + Version: "1.0.0", + } + errc = make(chan error) + fastc = make(chan struct{}) + slowc = make(chan struct{}) + ) + + // Simulate slow Peer. + go func() { + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + c, err := addr.Dial() + if err != nil { + errc <- err + return + } + + close(slowc) + + select { + case <-fastc: + // Fast peer connected. + case <-time.After(50 * time.Millisecond): + // We error if the fast peer didn't succeed. + errc <- fmt.Errorf("Fast peer timed out") + } + + sc, err := upgradeSecretConn(c, 20*time.Millisecond, ed25519.GenPrivKey()) + if err != nil { + errc <- err + return + } + + _, err = handshake(sc, 20*time.Millisecond, NodeInfo{ + ID: PubKeyToID(ed25519.GenPrivKey().PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "slow_peer", + }) + if err != nil { + errc <- err + return + } + }() + + // Simulate fast Peer. + go func() { + <-slowc + + var ( + dialer = NewMultiplexTransport( + fastNodeInfo, + NodeKey{ + PrivKey: fastNodePV, + }, + ) + ) + + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = dialer.Dial(*addr, peerConfig{}) + if err != nil { + errc <- err + return + } + + close(errc) + close(fastc) + }() + + if err := <-errc; err != nil { + t.Errorf("connection failed: %v", err) + } + + p, err := mt.Accept(peerConfig{}) + if err != nil { + t.Fatal(err) + } + + if have, want := p.NodeInfo(), fastNodeInfo; !reflect.DeepEqual(have, want) { + t.Errorf("have %v, want %v", have, want) + } +} + +func TestTransportMultiplexValidateNodeInfo(t *testing.T) { + mt := testSetupMultiplexTransport(t) + + errc := make(chan error) + + go func() { + var ( + pv = ed25519.GenPrivKey() + dialer = NewMultiplexTransport( + NodeInfo{ + ID: PubKeyToID(pv.PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "", // Should not be empty. + Version: "1.0.0", + }, + NodeKey{ + PrivKey: pv, + }, + ) + ) + + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = dialer.Dial(*addr, peerConfig{}) + if err != nil { + errc <- err + return + } + + close(errc) + }() + + if err := <-errc; err != nil { + t.Errorf("connection failed: %v", err) + } + + _, err := mt.Accept(peerConfig{}) + if err, ok := err.(ErrRejected); ok { + if !err.IsNodeInfoInvalid() { + t.Errorf("expected NodeInfo to be invalid") + } + } else { + t.Errorf("expected ErrRejected") + } +} + +func TestTransportMultiplexRejectMissmatchID(t *testing.T) { + mt := testSetupMultiplexTransport(t) + + errc := make(chan error) + + go func() { + dialer := NewMultiplexTransport( + NodeInfo{ + ID: PubKeyToID(ed25519.GenPrivKey().PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "dialer", + Version: "1.0.0", + }, + NodeKey{ + PrivKey: ed25519.GenPrivKey(), + }, + ) + + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = dialer.Dial(*addr, peerConfig{}) + if err != nil { + errc <- err + return + } + + close(errc) + }() + + if err := <-errc; err != nil { + t.Errorf("connection failed: %v", err) + } + + _, err := mt.Accept(peerConfig{}) + if err, ok := err.(ErrRejected); ok { + if !err.IsAuthFailure() { + t.Errorf("expected auth failure") + } + } else { + t.Errorf("expected ErrRejected") + } +} + +func TestTransportMultiplexRejectIncompatible(t *testing.T) { + mt := testSetupMultiplexTransport(t) + + errc := make(chan error) + + go func() { + var ( + pv = ed25519.GenPrivKey() + dialer = NewMultiplexTransport( + NodeInfo{ + ID: PubKeyToID(pv.PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "dialer", + Version: "2.0.0", + }, + NodeKey{ + PrivKey: pv, + }, + ) + ) + + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = dialer.Dial(*addr, peerConfig{}) + if err != nil { + errc <- err + return + } + + close(errc) + }() + + _, err := mt.Accept(peerConfig{}) + if err, ok := err.(ErrRejected); ok { + if !err.IsIncompatible() { + t.Errorf("expected to reject incompatible") + } + } else { + t.Errorf("expected ErrRejected") + } +} + +func TestTransportMultiplexRejectSelf(t *testing.T) { + mt := testSetupMultiplexTransport(t) + + errc := make(chan error) + + go func() { + addr, err := NewNetAddressStringWithOptionalID(mt.listener.Addr().String()) + if err != nil { + errc <- err + return + } + + _, err = mt.Dial(*addr, peerConfig{}) + if err != nil { + errc <- err + return + } + + close(errc) + }() + + if err := <-errc; err != nil { + if err, ok := err.(ErrRejected); ok { + if !err.IsSelf() { + t.Errorf("expected to reject self") + } + } else { + t.Errorf("expected ErrRejected") + } + } else { + t.Errorf("expected connection failure") + } + + _, err := mt.Accept(peerConfig{}) + if err, ok := err.(ErrRejected); ok { + if !err.IsSelf() { + t.Errorf("expected to reject self") + } + } else { + t.Errorf("expected ErrRejected") + } +} + +func TestTransportConnDuplicateIPFilter(t *testing.T) { + filter := ConnDuplicateIPFilter() + + if err := filter(nil, &testTransportConn{}, nil); err != nil { + t.Fatal(err) + } + + var ( + c = &testTransportConn{} + cs = NewConnSet() + ) + + cs.Set(c, []net.IP{ + net.IP{10, 0, 10, 1}, + net.IP{10, 0, 10, 2}, + net.IP{10, 0, 10, 3}, + }) + + if err := filter(cs, c, []net.IP{ + net.IP{10, 0, 10, 2}, + }); err == nil { + t.Errorf("expected Peer to be rejected as duplicate") + } +} + +func TestTransportHandshake(t *testing.T) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + var ( + peerPV = ed25519.GenPrivKey() + peerNodeInfo = NodeInfo{ + ID: PubKeyToID(peerPV.PubKey()), + } + ) + + go func() { + c, err := net.Dial(ln.Addr().Network(), ln.Addr().String()) + if err != nil { + t.Error(err) + return + } + + go func(c net.Conn) { + _, err := cdc.MarshalBinaryWriter(c, peerNodeInfo) + if err != nil { + t.Error(err) + } + }(c) + go func(c net.Conn) { + ni := NodeInfo{} + + _, err := cdc.UnmarshalBinaryReader( + c, + &ni, + int64(MaxNodeInfoSize()), + ) + if err != nil { + t.Error(err) + } + }(c) + }() + + c, err := ln.Accept() + if err != nil { + t.Fatal(err) + } + + ni, err := handshake(c, 20*time.Millisecond, NodeInfo{}) + if err != nil { + t.Fatal(err) + } + + if have, want := ni, peerNodeInfo; !reflect.DeepEqual(have, want) { + t.Errorf("have %v, want %v", have, want) + } +} + +func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport { + var ( + pv = ed25519.GenPrivKey() + mt = NewMultiplexTransport( + NodeInfo{ + ID: PubKeyToID(pv.PubKey()), + ListenAddr: "127.0.0.1:0", + Moniker: "transport", + Version: "1.0.0", + }, + NodeKey{ + PrivKey: pv, + }, + ) + ) + + addr, err := NewNetAddressStringWithOptionalID("127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + if err := mt.Listen(*addr); err != nil { + t.Fatal(err) + } + + return mt +} + +type testTransportAddr struct{} + +func (a *testTransportAddr) Network() string { return "tcp" } +func (a *testTransportAddr) String() string { return "test.local:1234" } + +type testTransportConn struct{} + +func (c *testTransportConn) Close() error { + return fmt.Errorf("Close() not implemented") +} + +func (c *testTransportConn) LocalAddr() net.Addr { + return &testTransportAddr{} +} + +func (c *testTransportConn) RemoteAddr() net.Addr { + return &testTransportAddr{} +} + +func (c *testTransportConn) Read(_ []byte) (int, error) { + return -1, fmt.Errorf("Read() not implemented") +} + +func (c *testTransportConn) SetDeadline(_ time.Time) error { + return fmt.Errorf("SetDeadline() not implemented") +} + +func (c *testTransportConn) SetReadDeadline(_ time.Time) error { + return fmt.Errorf("SetReadDeadline() not implemented") +} + +func (c *testTransportConn) SetWriteDeadline(_ time.Time) error { + return fmt.Errorf("SetWriteDeadline() not implemented") +} + +func (c *testTransportConn) Write(_ []byte) (int, error) { + return -1, fmt.Errorf("Write() not implemented") +}