base_service draft and some samples

This commit is contained in:
Jae Kwon
2015-07-19 17:42:01 -07:00
parent 111f001767
commit c30d38270c
4 changed files with 152 additions and 86 deletions

View File

@ -53,7 +53,12 @@ func (t *RepeatTimer) Reset() {
go t.fireRoutine(t.ticker) go t.fireRoutine(t.ticker)
} }
// For ease of .Stop()'ing services before .Start()'ing them,
// we ignore .Stop()'s on nil RepeatTimers.
func (t *RepeatTimer) Stop() bool { func (t *RepeatTimer) Stop() bool {
if t == nil {
return false
}
t.mtx.Lock() // Lock t.mtx.Lock() // Lock
defer t.mtx.Unlock() defer t.mtx.Unlock()

63
common/service.go Normal file
View File

@ -0,0 +1,63 @@
package common
import "sync/atomic"
// BaseService represents a service that can be started then stopped,
// but cannot be restarted.
// .Start() calls the onStart callback function, and .Stop() calls onStop.
// It is meant to be embedded into service structs.
// The user must ensure that Start() and Stop() are not called concurrently.
// It is ok to call Stop() without calling Start() first -- the onStop
// callback will be called, and the service will never start.
type BaseService struct {
name string
service interface{} // for log statements.
started uint32 // atomic
stopped uint32 // atomic
onStart func()
onStop func()
}
func NewBaseService(name string, service interface{}, onStart, onStop func()) *BaseService {
return &BaseService{
name: name,
service: service,
onStart: onStart,
onStop: onStop,
}
}
func (bs *BaseService) Start() bool {
if atomic.CompareAndSwapUint32(&bs.started, 0, 1) {
if atomic.LoadUint32(&bs.stopped) == 1 {
log.Warn(Fmt("Not starting %v -- already stopped", bs.name), "service", bs.service)
return false
} else {
log.Notice(Fmt("Starting %v", bs.name), "service", bs.service)
}
if bs.onStart != nil {
bs.onStart()
}
return true
} else {
log.Info(Fmt("Not starting %v -- already started", bs.name), "service", bs.service)
return false
}
}
func (bs *BaseService) Stop() bool {
if atomic.CompareAndSwapUint32(&bs.stopped, 0, 1) {
log.Notice(Fmt("Stopping %v", bs.name), "service", bs.service)
if bs.onStop != nil {
bs.onStop()
}
return true
} else {
log.Notice(Fmt("Not stopping %v", bs.name), "service", bs.service)
return false
}
}
func (bs *BaseService) IsRunning() bool {
return atomic.LoadUint32(&bs.started) == 1 && atomic.LoadUint32(&bs.stopped) == 0
}

View File

@ -59,26 +59,27 @@ queue is full.
Inbound message bytes are handled with an onReceive callback function. Inbound message bytes are handled with an onReceive callback function.
*/ */
type MConnection struct { type MConnection struct {
conn net.Conn BaseService
bufReader *bufio.Reader
bufWriter *bufio.Writer conn net.Conn
sendMonitor *flow.Monitor bufReader *bufio.Reader
recvMonitor *flow.Monitor bufWriter *bufio.Writer
sendRate int64 sendMonitor *flow.Monitor
recvRate int64 recvMonitor *flow.Monitor
flushTimer *ThrottleTimer // flush writes as necessary but throttled. sendRate int64
send chan struct{} recvRate int64
send chan struct{}
pong chan struct{}
channels []*Channel
channelsIdx map[byte]*Channel
onReceive receiveCbFunc
onError errorCbFunc
errored uint32
quit chan struct{} quit chan struct{}
pingTimer *RepeatTimer // send pings periodically flushTimer *ThrottleTimer // flush writes as necessary but throttled.
pong chan struct{} pingTimer *RepeatTimer // send pings periodically
chStatsTimer *RepeatTimer // update channel stats periodically chStatsTimer *RepeatTimer // update channel stats periodically
channels []*Channel
channelsIdx map[byte]*Channel
onReceive receiveCbFunc
onError errorCbFunc
started uint32
stopped uint32
errored uint32
LocalAddress *NetAddress LocalAddress *NetAddress
RemoteAddress *NetAddress RemoteAddress *NetAddress
@ -87,21 +88,24 @@ type MConnection struct {
func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection { func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc) *MConnection {
mconn := &MConnection{ mconn := &MConnection{
conn: conn, conn: conn,
bufReader: bufio.NewReaderSize(conn, minReadBufferSize), bufReader: bufio.NewReaderSize(conn, minReadBufferSize),
bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize), bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
sendMonitor: flow.New(0, 0), sendMonitor: flow.New(0, 0),
recvMonitor: flow.New(0, 0), recvMonitor: flow.New(0, 0),
sendRate: defaultSendRate, sendRate: defaultSendRate,
recvRate: defaultRecvRate, recvRate: defaultRecvRate,
flushTimer: NewThrottleTimer("flush", flushThrottleMS*time.Millisecond), send: make(chan struct{}, 1),
send: make(chan struct{}, 1), pong: make(chan struct{}),
quit: make(chan struct{}), onReceive: onReceive,
pingTimer: NewRepeatTimer("ping", pingTimeoutSeconds*time.Second), onError: onError,
pong: make(chan struct{}),
chStatsTimer: NewRepeatTimer("chStats", updateStatsSeconds*time.Second), // Initialized in Start()
onReceive: onReceive, quit: nil,
onError: onError, flushTimer: nil,
pingTimer: nil,
chStatsTimer: nil,
LocalAddress: NewNetAddress(conn.LocalAddr()), LocalAddress: NewNetAddress(conn.LocalAddr()),
RemoteAddress: NewNetAddress(conn.RemoteAddr()), RemoteAddress: NewNetAddress(conn.RemoteAddr()),
} }
@ -118,32 +122,33 @@ func NewMConnection(conn net.Conn, chDescs []*ChannelDescriptor, onReceive recei
mconn.channels = channels mconn.channels = channels
mconn.channelsIdx = channelsIdx mconn.channelsIdx = channelsIdx
mconn.BaseService = *NewBaseService("MConnection", mconn, mconn.onStart, mconn.onStop)
return mconn return mconn
} }
// .Start() begins multiplexing packets to and from "channels". func (c *MConnection) onStart() {
func (c *MConnection) Start() { c.quit = make(chan struct{})
if atomic.CompareAndSwapUint32(&c.started, 0, 1) { go c.sendRoutine()
log.Info("Starting MConnection", "connection", c) go c.recvRoutine()
go c.sendRoutine() c.flushTimer = NewThrottleTimer("flush", flushThrottleMS*time.Millisecond)
go c.recvRoutine() c.pingTimer = NewRepeatTimer("ping", pingTimeoutSeconds*time.Second)
} c.chStatsTimer = NewRepeatTimer("chStats", updateStatsSeconds*time.Second)
} }
func (c *MConnection) Stop() { func (c *MConnection) onStop() {
if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { c.flushTimer.Stop()
log.Info("Stopping MConnection", "connection", c) c.pingTimer.Stop()
c.chStatsTimer.Stop()
if c.quit != nil {
close(c.quit) close(c.quit)
c.conn.Close()
c.flushTimer.Stop()
c.chStatsTimer.Stop()
c.pingTimer.Stop()
// We can't close pong safely here because
// recvRoutine may write to it after we've stopped.
// Though it doesn't need to get closed at all,
// we close it @ recvRoutine.
// close(c.pong)
} }
c.conn.Close()
// We can't close pong safely here because
// recvRoutine may write to it after we've stopped.
// Though it doesn't need to get closed at all,
// we close it @ recvRoutine.
// close(c.pong)
} }
func (c *MConnection) String() string { func (c *MConnection) String() string {
@ -178,11 +183,11 @@ func (c *MConnection) stopForError(r interface{}) {
// Queues a message to be sent to channel. // Queues a message to be sent to channel.
func (c *MConnection) Send(chId byte, msg interface{}) bool { func (c *MConnection) Send(chId byte, msg interface{}) bool {
if atomic.LoadUint32(&c.stopped) == 1 { if !c.IsRunning() {
return false return false
} }
log.Info("Send", "channel", chId, "connection", c, "msg", msg) //, "bytes", binary.BinaryBytes(msg)) log.Info("Send", "channel", chId, "conn", c, "msg", msg) //, "bytes", binary.BinaryBytes(msg))
// Send message to channel. // Send message to channel.
channel, ok := c.channelsIdx[chId] channel, ok := c.channelsIdx[chId]
@ -199,7 +204,7 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool {
default: default:
} }
} else { } else {
log.Warn("Send failed", "channel", chId, "connection", c, "msg", msg) log.Warn("Send failed", "channel", chId, "conn", c, "msg", msg)
} }
return success return success
} }
@ -207,11 +212,11 @@ func (c *MConnection) Send(chId byte, msg interface{}) bool {
// Queues a message to be sent to channel. // Queues a message to be sent to channel.
// Nonblocking, returns true if successful. // Nonblocking, returns true if successful.
func (c *MConnection) TrySend(chId byte, msg interface{}) bool { func (c *MConnection) TrySend(chId byte, msg interface{}) bool {
if atomic.LoadUint32(&c.stopped) == 1 { if !c.IsRunning() {
return false return false
} }
log.Info("TrySend", "channel", chId, "connection", c, "msg", msg) log.Info("TrySend", "channel", chId, "conn", c, "msg", msg)
// Send message to channel. // Send message to channel.
channel, ok := c.channelsIdx[chId] channel, ok := c.channelsIdx[chId]
@ -233,7 +238,7 @@ func (c *MConnection) TrySend(chId byte, msg interface{}) bool {
} }
func (c *MConnection) CanSend(chId byte) bool { func (c *MConnection) CanSend(chId byte) bool {
if atomic.LoadUint32(&c.stopped) == 1 { if !c.IsRunning() {
return false return false
} }
@ -286,11 +291,11 @@ FOR_LOOP:
} }
} }
if atomic.LoadUint32(&c.stopped) == 1 { if !c.IsRunning() {
break FOR_LOOP break FOR_LOOP
} }
if err != nil { if err != nil {
log.Warn("Connection failed @ sendRoutine", "connection", c, "error", err) log.Warn("Connection failed @ sendRoutine", "conn", c, "error", err)
c.stopForError(err) c.stopForError(err)
break FOR_LOOP break FOR_LOOP
} }
@ -386,8 +391,8 @@ FOR_LOOP:
pktType := binary.ReadByte(c.bufReader, &n, &err) pktType := binary.ReadByte(c.bufReader, &n, &err)
c.recvMonitor.Update(int(n)) c.recvMonitor.Update(int(n))
if err != nil { if err != nil {
if atomic.LoadUint32(&c.stopped) != 1 { if !c.IsRunning() {
log.Warn("Connection failed @ recvRoutine (reading byte)", "connection", c, "error", err) log.Warn("Connection failed @ recvRoutine (reading byte)", "conn", c, "error", err)
c.stopForError(err) c.stopForError(err)
} }
break FOR_LOOP break FOR_LOOP
@ -407,8 +412,9 @@ FOR_LOOP:
binary.ReadBinaryPtr(&pkt, c.bufReader, &n, &err) binary.ReadBinaryPtr(&pkt, c.bufReader, &n, &err)
c.recvMonitor.Update(int(n)) c.recvMonitor.Update(int(n))
if err != nil { if err != nil {
if atomic.LoadUint32(&c.stopped) != 1 { if !c.IsRunning() {
log.Warn("Connection failed @ recvRoutine", "connection", c, "error", err)
log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err)
c.stopForError(err) c.stopForError(err)
} }
break FOR_LOOP break FOR_LOOP
@ -419,8 +425,9 @@ FOR_LOOP:
} }
msgBytes, err := channel.recvMsgPacket(pkt) msgBytes, err := channel.recvMsgPacket(pkt)
if err != nil { if err != nil {
if atomic.LoadUint32(&c.stopped) != 1 { if !c.IsRunning() {
log.Warn("Connection failed @ recvRoutine", "connection", c, "error", err)
log.Warn("Connection failed @ recvRoutine", "conn", c, "error", err)
c.stopForError(err) c.stopForError(err)
} }
break FOR_LOOP break FOR_LOOP

View File

@ -12,9 +12,10 @@ import (
) )
type Peer struct { type Peer struct {
BaseService
outbound bool outbound bool
mconn *MConnection mconn *MConnection
running uint32
*types.NodeInfo *types.NodeInfo
Key string Key string
@ -64,30 +65,20 @@ func newPeer(conn net.Conn, peerNodeInfo *types.NodeInfo, outbound bool, reactor
p = &Peer{ p = &Peer{
outbound: outbound, outbound: outbound,
mconn: mconn, mconn: mconn,
running: 0,
NodeInfo: peerNodeInfo, NodeInfo: peerNodeInfo,
Key: peerNodeInfo.PubKey.KeyString(), Key: peerNodeInfo.PubKey.KeyString(),
Data: NewCMap(), Data: NewCMap(),
} }
p.BaseService = *NewBaseService("Peer", p, p.onStart, p.onStop)
return p return p
} }
func (p *Peer) start() { func (p *Peer) onStart() {
if atomic.CompareAndSwapUint32(&p.running, 0, 1) { p.mconn.Start()
log.Info("Starting Peer", "peer", p)
p.mconn.Start()
}
} }
func (p *Peer) stop() { func (p *Peer) onStop() {
if atomic.CompareAndSwapUint32(&p.running, 1, 0) { p.mconn.Stop()
log.Info("Stopping Peer", "peer", p)
p.mconn.Stop()
}
}
func (p *Peer) IsRunning() bool {
return atomic.LoadUint32(&p.running) == 1
} }
func (p *Peer) Connection() *MConnection { func (p *Peer) Connection() *MConnection {
@ -99,21 +90,21 @@ func (p *Peer) IsOutbound() bool {
} }
func (p *Peer) Send(chId byte, msg interface{}) bool { func (p *Peer) Send(chId byte, msg interface{}) bool {
if atomic.LoadUint32(&p.running) == 0 { if !p.IsRunning() {
return false return false
} }
return p.mconn.Send(chId, msg) return p.mconn.Send(chId, msg)
} }
func (p *Peer) TrySend(chId byte, msg interface{}) bool { func (p *Peer) TrySend(chId byte, msg interface{}) bool {
if atomic.LoadUint32(&p.running) == 0 { if !p.IsRunning() {
return false return false
} }
return p.mconn.TrySend(chId, msg) return p.mconn.TrySend(chId, msg)
} }
func (p *Peer) CanSend(chId byte) bool { func (p *Peer) CanSend(chId byte) bool {
if atomic.LoadUint32(&p.running) == 0 { if !p.IsRunning() {
return false return false
} }
return p.mconn.CanSend(chId) return p.mconn.CanSend(chId)