diff --git a/common/repeat_timer.go b/common/repeat_timer.go index 2822b968..e2aa18ea 100644 --- a/common/repeat_timer.go +++ b/common/repeat_timer.go @@ -53,7 +53,12 @@ func (t *RepeatTimer) Reset() { go t.fireRoutine(t.ticker) } +// For ease of .Stop()'ing services before .Start()'ing them, +// we ignore .Stop()'s on nil RepeatTimers. func (t *RepeatTimer) Stop() bool { + if t == nil { + return false + } t.mtx.Lock() // Lock defer t.mtx.Unlock() diff --git a/common/service.go b/common/service.go new file mode 100644 index 00000000..686686f2 --- /dev/null +++ b/common/service.go @@ -0,0 +1,63 @@ +package common + +import "sync/atomic" + +// BaseService represents a service that can be started then stopped, +// but cannot be restarted. +// .Start() calls the onStart callback function, and .Stop() calls onStop. +// It is meant to be embedded into service structs. +// The user must ensure that Start() and Stop() are not called concurrently. +// It is ok to call Stop() without calling Start() first -- the onStop +// callback will be called, and the service will never start. +type BaseService struct { + name string + service interface{} // for log statements. + started uint32 // atomic + stopped uint32 // atomic + onStart func() + onStop func() +} + +func NewBaseService(name string, service interface{}, onStart, onStop func()) *BaseService { + return &BaseService{ + name: name, + service: service, + onStart: onStart, + onStop: onStop, + } +} + +func (bs *BaseService) Start() bool { + if atomic.CompareAndSwapUint32(&bs.started, 0, 1) { + if atomic.LoadUint32(&bs.stopped) == 1 { + log.Warn(Fmt("Not starting %v -- already stopped", bs.name), "service", bs.service) + return false + } else { + log.Notice(Fmt("Starting %v", bs.name), "service", bs.service) + } + if bs.onStart != nil { + bs.onStart() + } + return true + } else { + log.Info(Fmt("Not starting %v -- already started", bs.name), "service", bs.service) + return false + } +} + +func (bs *BaseService) Stop() bool { + if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) { + log.Notice(Fmt("Stopping %v", bs.name), "service", bs.service) + if bs.onStop != nil { + bs.onStop() + } + return true + } else { + log.Notice(Fmt("Not stopping %v", bs.name), "service", bs.service) + return false + } +} + +func (bs *BaseService) IsRunning() bool { + return atomic.LoadUint32(&bs.started) == 1 && atomic.LoadUint32(&bs.stopped) == 0 +} diff --git a/p2p/connection.go b/p2p/connection.go index c095c166..cd9be108 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -59,26 +59,27 @@ queue is full. Inbound message bytes are handled with an onReceive callback function. */ type MConnection struct { - conn net.Conn - bufReader *bufio.Reader - bufWriter *bufio.Writer - sendMonitor *flow.Monitor - recvMonitor *flow.Monitor - sendRate int64 - recvRate int64 - flushTimer *ThrottleTimer // flush writes as necessary but throttled. - send chan struct{} + BaseService + + conn net.Conn + bufReader *bufio.Reader + bufWriter *bufio.Writer + sendMonitor *flow.Monitor + recvMonitor *flow.Monitor + sendRate int64 + recvRate int64 + send chan struct{} + pong chan struct{} + channels []*Channel + channelsIdx map[byte]*Channel + onReceive receiveCbFunc + onError errorCbFunc + errored uint32 + quit chan struct{} - pingTimer *RepeatTimer // send pings periodically - pong chan struct{} - chStatsTimer *RepeatTimer // update channel stats periodically - channels []*Channel - channelsIdx map[byte]*Channel - onReceive receiveCbFunc - onError errorCbFunc - started uint32 - stopped uint32 - errored uint32 + flushTimer *ThrottleTimer // flush writes as necessary but throttled. + pingTimer *RepeatTimer // send pings periodically + chStatsTimer *RepeatTimer // update channel stats periodically LocalAddress *NetAddress RemoteAddress *NetAddress @@ -87,21 +88,24 @@ type MConnection struct { func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection { mconn := &MConnection{ - conn: conn, - bufReader: bufio.NewReaderSize(conn, minReadBufferSize), - bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize), - sendMonitor: flow.New(0, 0), - recvMonitor: flow.New(0, 0), - sendRate: defaultSendRate, - recvRate: defaultRecvRate, - flushTimer: NewThrottleTimer("flush", flushThrottleMS*time.Millisecond), - send: make(chan struct{}, 1), - quit: make(chan struct{}), - pingTimer: NewRepeatTimer("ping", pingTimeoutSeconds*time.Second), - pong: make(chan struct{}), - chStatsTimer: NewRepeatTimer("chStats", updateStatsSeconds*time.Second), - onReceive: onReceive, - onError: onError, + conn: conn, + bufReader: bufio.NewReaderSize(conn, minReadBufferSize), + bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize), + sendMonitor: flow.New(0, 0), + recvMonitor: flow.New(0, 0), + sendRate: defaultSendRate, + recvRate: defaultRecvRate, + send: make(chan struct{}, 1), + pong: make(chan struct{}), + onReceive: onReceive, + onError: onError, + + // Initialized in Start() + quit: nil, + flushTimer: nil, + pingTimer: nil, + chStatsTimer: nil, + LocalAddress: NewNetAddress(conn.LocalAddr()), RemoteAddress: NewNetAddress(conn.RemoteAddr()), } @@ -118,32 +122,33 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei mconn.channels = channels mconn.channelsIdx = channelsIdx + mconn.BaseService = *NewBaseService("MConnection", mconn, mconn.onStart, mconn.onStop) + return mconn } -// .Start() begins multiplexing packets to and from "channels". -func (c *MConnection) Start() { - if atomic.CompareAndSwapUint32(&c.started, 0, 1) { - log.Info("Starting MConnection", "connection", c) - go c.sendRoutine() - go c.recvRoutine() - } +func (c *MConnection) onStart() { + c.quit = make(chan struct{}) + go c.sendRoutine() + go c.recvRoutine() + c.flushTimer = NewThrottleTimer("flush", flushThrottleMS*time.Millisecond) + c.pingTimer = NewRepeatTimer("ping", pingTimeoutSeconds*time.Second) + c.chStatsTimer = NewRepeatTimer("chStats", updateStatsSeconds*time.Second) } -func (c *MConnection) Stop() { - if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { - log.Info("Stopping MConnection", "connection", c) +func (c *MConnection) onStop() { + c.flushTimer.Stop() + c.pingTimer.Stop() + c.chStatsTimer.Stop() + if c.quit != nil { close(c.quit) - c.conn.Close() - c.flushTimer.Stop() - c.chStatsTimer.Stop() - c.pingTimer.Stop() - // We can't close pong safely here because - // recvRoutine may write to it after we've stopped. - // Though it doesn't need to get closed at all, - // we close it @ recvRoutine. - // close(c.pong) } + c.conn.Close() + // We can't close pong safely here because + // recvRoutine may write to it after we've stopped. + // Though it doesn't need to get closed at all, + // we close it @ recvRoutine. + // close(c.pong) } func (c *MConnection) String() string { @@ -178,11 +183,11 @@ func (c *MConnection) stopForError(r interface{}) { // Queues a message to be sent to channel. func (c *MConnection) Send(chId byte, msg interface{}) bool { - if atomic.LoadUint32(&c.stopped) == 1 { + if !c.IsRunning() { return false } - log.Info("Send", "channel", chId, "connection", c, "msg", msg) //, "bytes", binary.BinaryBytes(msg)) + log.Info("Send", "channel", chId, "conn", c, "msg", msg) //, "bytes", binary.BinaryBytes(msg)) // Send message to channel. channel, ok := c.channelsIdx[chId] @@ -199,7 +204,7 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool { default: } } else { - log.Warn("Send failed", "channel", chId, "connection", c, "msg", msg) + log.Warn("Send failed", "channel", chId, "conn", c, "msg", msg) } return success } @@ -207,11 +212,11 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool { // Queues a message to be sent to channel. // Nonblocking, returns true if successful. func (c *MConnection) TrySend(chId byte, msg interface{}) bool { - if atomic.LoadUint32(&c.stopped) == 1 { + if !c.IsRunning() { return false } - log.Info("TrySend", "channel", chId, "connection", c, "msg", msg) + log.Info("TrySend", "channel", chId, "conn", c, "msg", msg) // Send message to channel. channel, ok := c.channelsIdx[chId] @@ -233,7 +238,7 @@ func (c *MConnection) TrySend(chId byte, msg interface{}) bool { } func (c *MConnection) CanSend(chId byte) bool { - if atomic.LoadUint32(&c.stopped) == 1 { + if !c.IsRunning() { return false } @@ -286,11 +291,11 @@ FOR_LOOP: } } - if atomic.LoadUint32(&c.stopped) == 1 { + if !c.IsRunning() { break FOR_LOOP } if err != nil { - log.Warn("Connection failed @ sendRoutine", "connection", c, "error", err) + log.Warn("Connection failed @ sendRoutine", "conn", c, "error", err) c.stopForError(err) break FOR_LOOP } @@ -386,8 +391,8 @@ FOR_LOOP: pktType := binary.ReadByte(c.bufReader, &n, &err) c.recvMonitor.Update(int(n)) if err != nil { - if atomic.LoadUint32(&c.stopped) != 1 { - log.Warn("Connection failed @ recvRoutine (reading byte)", "connection", c, "error", err) + if !c.IsRunning() { + log.Warn("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err) c.stopForError(err) } break FOR_LOOP @@ -407,8 +412,9 @@ FOR_LOOP: binary.ReadBinaryPtr(&pkt, c.bufReader, &n, &err) c.recvMonitor.Update(int(n)) if err != nil { - if atomic.LoadUint32(&c.stopped) != 1 { - log.Warn("Connection failed @ recvRoutine", "connection", c, "error", err) + if !c.IsRunning() { + + log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err) c.stopForError(err) } break FOR_LOOP @@ -419,8 +425,9 @@ FOR_LOOP: } msgBytes, err := channel.recvMsgPacket(pkt) if err != nil { - if atomic.LoadUint32(&c.stopped) != 1 { - log.Warn("Connection failed @ recvRoutine", "connection", c, "error", err) + if !c.IsRunning() { + + log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err) c.stopForError(err) } break FOR_LOOP diff --git a/p2p/peer.go b/p2p/peer.go index 776e1f66..682d8f7d 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -12,9 +12,10 @@ import ( ) type Peer struct { + BaseService + outbound bool mconn *MConnection - running uint32 *types.NodeInfo Key string @@ -64,30 +65,20 @@ func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactor p = &Peer{ outbound: outbound, mconn: mconn, - running: 0, NodeInfo: peerNodeInfo, Key: peerNodeInfo.PubKey.KeyString(), Data: NewCMap(), } + p.BaseService = *NewBaseService("Peer", p, p.onStart, p.onStop) return p } -func (p *Peer) start() { - if atomic.CompareAndSwapUint32(&p.running, 0, 1) { - log.Info("Starting Peer", "peer", p) - p.mconn.Start() - } +func (p *Peer) onStart() { + p.mconn.Start() } -func (p *Peer) stop() { - if atomic.CompareAndSwapUint32(&p.running, 1, 0) { - log.Info("Stopping Peer", "peer", p) - p.mconn.Stop() - } -} - -func (p *Peer) IsRunning() bool { - return atomic.LoadUint32(&p.running) == 1 +func (p *Peer) onStop() { + p.mconn.Stop() } func (p *Peer) Connection() *MConnection { @@ -99,21 +90,21 @@ func (p *Peer) IsOutbound() bool { } func (p *Peer) Send(chId byte, msg interface{}) bool { - if atomic.LoadUint32(&p.running) == 0 { + if !p.IsRunning() { return false } return p.mconn.Send(chId, msg) } func (p *Peer) TrySend(chId byte, msg interface{}) bool { - if atomic.LoadUint32(&p.running) == 0 { + if !p.IsRunning() { return false } return p.mconn.TrySend(chId, msg) } func (p *Peer) CanSend(chId byte) bool { - if atomic.LoadUint32(&p.running) == 0 { + if !p.IsRunning() { return false } return p.mconn.CanSend(chId)