From 0d5e0d2f1326e61c81a417c76fee6f83ffb44e14 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 16 Nov 2018 17:44:19 -0500 Subject: [PATCH] p2p/conn: FlushStop. Use in pex. Closes #2092 (#2802) * p2p/conn: FlushStop. Use in pex. Closes #2092 In seed mode, we call StopPeer immediately after Send. Since flushing msgs to the peer happens in the background, the peer connection is often closed before the messages are actually sent out. The new FlushStop method allows all msgs to first be written and flushed out on the conn before it is closed. * fix dummy peer * typo * fixes from review * more comments * ensure pex doesn't call FlushStop more than once FlushStop is not safe to call more than once, but we call it from Receive in a go-routine so Receive doesn't block. To ensure we only call it once, we use the lastReceivedRequests map - if an entry already exists, then FlushStop should already have been called and we can return. --- blockchain/reactor_test.go | 1 + p2p/conn/connection.go | 64 +++++++++++++++++++++++++++++++++---- p2p/conn/connection_test.go | 37 +++++++++++++++++++++ p2p/dummy/peer.go | 5 +++ p2p/peer.go | 10 ++++++ p2p/peer_set_test.go | 1 + p2p/pex/pex_reactor.go | 33 +++++++++++++------ p2p/pex/pex_reactor_test.go | 1 + 8 files changed, 135 insertions(+), 17 deletions(-) diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index fca063e0..9b26f919 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -197,6 +197,7 @@ func (tp *bcrTestPeer) TrySend(chID byte, msgBytes []byte) bool { return true } +func (tp *bcrTestPeer) FlushStop() {} func (tp *bcrTestPeer) Send(chID byte, msgBytes []byte) bool { return tp.TrySend(chID, msgBytes) } func (tp *bcrTestPeer) NodeInfo() p2p.NodeInfo { return p2p.DefaultNodeInfo{} } func (tp *bcrTestPeer) Status() p2p.ConnectionStatus { return p2p.ConnectionStatus{} } diff --git a/p2p/conn/connection.go b/p2p/conn/connection.go index 89282b00..c6aad038 100644 --- a/p2p/conn/connection.go +++ b/p2p/conn/connection.go @@ -84,7 +84,11 @@ type MConnection struct { errored uint32 config MConnConfig - quit chan struct{} + // Closing quitSendRoutine will cause + // doneSendRoutine to close. + quitSendRoutine chan struct{} + doneSendRoutine chan struct{} + flushTimer *cmn.ThrottleTimer // flush writes as necessary but throttled. pingTimer *cmn.RepeatTimer // send pings periodically @@ -190,7 +194,8 @@ func (c *MConnection) OnStart() error { if err := c.BaseService.OnStart(); err != nil { return err } - c.quit = make(chan struct{}) + c.quitSendRoutine = make(chan struct{}) + c.doneSendRoutine = make(chan struct{}) c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", c.config.PingInterval) c.pongTimeoutCh = make(chan bool, 1) @@ -200,15 +205,59 @@ func (c *MConnection) OnStart() error { return nil } -// OnStop implements BaseService -func (c *MConnection) OnStop() { +// FlushStop replicates the logic of OnStop. +// It additionally ensures that all successful +// .Send() calls will get flushed before closing +// the connection. +// NOTE: it is not safe to call this method more than once. +func (c *MConnection) FlushStop() { c.BaseService.OnStop() c.flushTimer.Stop() c.pingTimer.Stop() c.chStatsTimer.Stop() - if c.quit != nil { - close(c.quit) + if c.quitSendRoutine != nil { + close(c.quitSendRoutine) + // wait until the sendRoutine exits + // so we dont race on calling sendSomePacketMsgs + <-c.doneSendRoutine } + + // Send and flush all pending msgs. + // By now, IsRunning == false, + // so any concurrent attempts to send will fail. + // Since sendRoutine has exited, we can call this + // safely + eof := c.sendSomePacketMsgs() + for !eof { + eof = c.sendSomePacketMsgs() + } + c.flush() + + // Now we can close the connection + c.conn.Close() // nolint: errcheck + + // We can't close pong safely here because + // recvRoutine may write to it after we've stopped. + // Though it doesn't need to get closed at all, + // we close it @ recvRoutine. + + // c.Stop() +} + +// OnStop implements BaseService +func (c *MConnection) OnStop() { + select { + case <-c.quitSendRoutine: + // already quit via FlushStop + return + default: + } + + c.BaseService.OnStop() + c.flushTimer.Stop() + c.pingTimer.Stop() + c.chStatsTimer.Stop() + close(c.quitSendRoutine) c.conn.Close() // nolint: errcheck // We can't close pong safely here because @@ -365,7 +414,8 @@ FOR_LOOP: } c.sendMonitor.Update(int(_n)) c.flush() - case <-c.quit: + case <-c.quitSendRoutine: + close(c.doneSendRoutine) break FOR_LOOP case <-c.send: // Send some PacketMsgs diff --git a/p2p/conn/connection_test.go b/p2p/conn/connection_test.go index 59fe0d1d..a757f07a 100644 --- a/p2p/conn/connection_test.go +++ b/p2p/conn/connection_test.go @@ -36,6 +36,43 @@ func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msg return c } +func TestMConnectionSendFlushStop(t *testing.T) { + server, client := NetPipe() + defer server.Close() // nolint: errcheck + defer client.Close() // nolint: errcheck + + clientConn := createTestMConnection(client) + err := clientConn.Start() + require.Nil(t, err) + defer clientConn.Stop() + + msg := []byte("abc") + assert.True(t, clientConn.Send(0x01, msg)) + + aminoMsgLength := 14 + + // start the reader in a new routine, so we can flush + errCh := make(chan error) + go func() { + msgB := make([]byte, aminoMsgLength) + _, err := server.Read(msgB) + if err != nil { + t.Fatal(err) + } + errCh <- err + }() + + // stop the conn - it should flush all conns + clientConn.FlushStop() + + timer := time.NewTimer(3 * time.Second) + select { + case <-errCh: + case <-timer.C: + t.Error("timed out waiting for msgs to be read") + } +} + func TestMConnectionSend(t *testing.T) { server, client := NetPipe() defer server.Close() // nolint: errcheck diff --git a/p2p/dummy/peer.go b/p2p/dummy/peer.go index 65ff65fb..71def27e 100644 --- a/p2p/dummy/peer.go +++ b/p2p/dummy/peer.go @@ -25,6 +25,11 @@ func NewPeer() *peer { return p } +// FlushStop just calls Stop. +func (p *peer) FlushStop() { + p.Stop() +} + // ID always returns dummy. func (p *peer) ID() p2p.ID { return p2p.ID("dummy") diff --git a/p2p/peer.go b/p2p/peer.go index e98c16d2..6417948d 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -17,6 +17,7 @@ const metricsTickerDuration = 10 * time.Second // Peer is an interface representing a peer connected on a reactor. type Peer interface { cmn.Service + FlushStop() ID() ID // peer's cryptographic ID RemoteIP() net.IP // remote IP of the connection @@ -184,6 +185,15 @@ func (p *peer) OnStart() error { return nil } +// FlushStop mimics OnStop but additionally ensures that all successful +// .Send() calls will get flushed before closing the connection. +// NOTE: it is not safe to call this method more than once. +func (p *peer) FlushStop() { + p.metricsTicker.Stop() + p.BaseService.OnStop() + p.mconn.FlushStop() // stop everything and close the conn +} + // OnStop implements BaseService. func (p *peer) OnStop() { p.metricsTicker.Stop() diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index daa9b2c8..04b877b0 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -18,6 +18,7 @@ type mockPeer struct { id ID } +func (mp *mockPeer) FlushStop() { mp.Stop() } func (mp *mockPeer) TrySend(chID byte, msgBytes []byte) bool { return true } func (mp *mockPeer) Send(chID byte, msgBytes []byte) bool { return true } func (mp *mockPeer) NodeInfo() NodeInfo { return DefaultNodeInfo{} } diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go index 85d292b0..057aadaa 100644 --- a/p2p/pex/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -208,25 +208,38 @@ func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) { switch msg := msg.(type) { case *pexRequestMessage: - // Check we're not receiving too many requests - if err := r.receiveRequest(src); err != nil { - r.Switch.StopPeerForError(src, err) - return - } - // Seeds disconnect after sending a batch of addrs - // NOTE: this is a prime candidate for amplification attacks + // NOTE: this is a prime candidate for amplification attacks, // so it's important we // 1) restrict how frequently peers can request // 2) limit the output size - if r.config.SeedMode { + + // If we're a seed and this is an inbound peer, + // respond once and disconnect. + if r.config.SeedMode && !src.IsOutbound() { + id := string(src.ID()) + v := r.lastReceivedRequests.Get(id) + if v != nil { + // FlushStop/StopPeer are already + // running in a go-routine. + return + } + r.lastReceivedRequests.Set(id, time.Now()) + + // Send addrs and disconnect r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers)) go func() { - // TODO Fix properly #2092 - time.Sleep(time.Second * 5) + // In a go-routine so it doesn't block .Receive. + src.FlushStop() r.Switch.StopPeerGracefully(src) }() + } else { + // Check we're not receiving requests too frequently. + if err := r.receiveRequest(src); err != nil { + r.Switch.StopPeerForError(src, err) + return + } r.SendAddrs(src, r.book.GetSelection()) } diff --git a/p2p/pex/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go index 9d3f49bb..8f3ceb89 100644 --- a/p2p/pex/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -387,6 +387,7 @@ func newMockPeer() mockPeer { return mp } +func (mp mockPeer) FlushStop() { mp.Stop() } func (mp mockPeer) ID() p2p.ID { return mp.addr.ID } func (mp mockPeer) IsOutbound() bool { return mp.outbound } func (mp mockPeer) IsPersistent() bool { return mp.persistent }