s/StartListener/NewServer/g

This commit is contained in:
Jae Kwon 2016-02-21 23:44:33 -08:00
parent 90e38f08f4
commit 2c1aa7af2b
4 changed files with 90 additions and 60 deletions

View File

@ -16,7 +16,7 @@ func main() {
app := counter.NewCounterApplication(*serialPtr) app := counter.NewCounterApplication(*serialPtr)
// Start the listener // Start the listener
_, err := server.StartListener(*addrPtr, app) _, err := server.NewServer(*addrPtr, app)
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }

View File

@ -14,7 +14,7 @@ func main() {
flag.Parse() flag.Parse()
// Start the listener // Start the listener
_, err := server.StartListener(*addrPtr, dummy.NewDummyApplication()) _, err := server.NewServer(*addrPtr, dummy.NewDummyApplication())
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }

View File

@ -14,7 +14,7 @@ func TestStream(t *testing.T) {
numAppendTxs := 200000 numAppendTxs := 200000
// Start the listener // Start the listener
_, err := server.StartListener("tcp://127.0.0.1:46658", NewDummyApplication()) _, err := server.NewServer("tcp://127.0.0.1:46658", NewDummyApplication())
if err != nil { if err != nil {
Exit(err.Error()) Exit(err.Error())
} }

View File

@ -14,65 +14,95 @@ import (
// var maxNumberConnections = 2 // var maxNumberConnections = 2
func StartListener(protoAddr string, app types.Application) (net.Listener, error) { type Server struct {
var mtx sync.Mutex // global mutex QuitService
proto string
addr string
listener net.Listener
appMtx sync.Mutex
app types.Application
}
func NewServer(protoAddr string, app types.Application) (*Server, error) {
parts := strings.SplitN(protoAddr, "://", 2) parts := strings.SplitN(protoAddr, "://", 2)
proto, addr := parts[0], parts[1] proto, addr := parts[0], parts[1]
ln, err := net.Listen(proto, addr) s := &Server{
if err != nil { proto: proto,
return nil, err addr: addr,
listener: nil,
app: app,
} }
s.QuitService = *NewQuitService(nil, "TMSPServer", s)
s.Start() // Just start it
return s, nil
}
// A goroutine to accept a connection. func (s *Server) OnStart() error {
go func() { s.QuitService.OnStart()
// semaphore := make(chan struct{}, maxNumberConnections) ln, err := net.Listen(s.proto, s.addr)
if err != nil {
return err
}
s.listener = ln
go s.acceptConnectionsRoutine()
return nil
}
for { func (s *Server) OnStop() {
// semaphore <- struct{}{} s.QuitService.OnStop()
s.listener.Close()
}
// Accept a connection func (s *Server) acceptConnectionsRoutine() {
fmt.Println("Waiting for new connection...") // semaphore := make(chan struct{}, maxNumberConnections)
conn, err := ln.Accept()
if err != nil { for {
Exit("Failed to accept connection") // semaphore <- struct{}{}
} else {
fmt.Println("Accepted a new connection") // Accept a connection
fmt.Println("Waiting for new connection...")
conn, err := s.listener.Accept()
if err != nil {
if !s.IsRunning() {
return // Ignore error from listener closing.
} }
Exit("Failed to accept connection: " + err.Error())
closeConn := make(chan error, 2) // Push to signal connection closed } else {
responses := make(chan *types.Response, 1000) // A channel to buffer responses fmt.Println("Accepted a new connection")
// Read requests from conn and deal with them
go handleRequests(&mtx, app, closeConn, conn, responses)
// Pull responses from 'responses' and write them to conn.
go handleResponses(closeConn, responses, conn)
go func() {
// Wait until signal to close connection
errClose := <-closeConn
if errClose != nil {
fmt.Printf("Connection error: %v\n", errClose)
} else {
fmt.Println("Connection was closed.")
}
// Close the connection
err := conn.Close()
if err != nil {
fmt.Printf("Error in closing connection: %v\n", err)
}
// <-semaphore
}()
} }
}() closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan *types.Response, 1000) // A channel to buffer responses
return ln, nil // Read requests from conn and deal with them
go s.handleRequests(closeConn, conn, responses)
// Pull responses from 'responses' and write them to conn.
go s.handleResponses(closeConn, responses, conn)
go func() {
// Wait until signal to close connection
errClose := <-closeConn
if errClose != nil {
fmt.Printf("Connection error: %v\n", errClose)
} else {
fmt.Println("Connection was closed.")
}
// Close the connection
err := conn.Close()
if err != nil {
fmt.Printf("Error in closing connection: %v\n", err)
}
// <-semaphore
}()
}
} }
// Read requests from conn and deal with them // Read requests from conn and deal with them
func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error, conn net.Conn, responses chan<- *types.Response) { func (s *Server) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
var count int var count int
var bufReader = bufio.NewReader(conn) var bufReader = bufio.NewReader(conn)
for { for {
@ -87,36 +117,36 @@ func handleRequests(mtx *sync.Mutex, app types.Application, closeConn chan error
} }
return return
} }
mtx.Lock() s.appMtx.Lock()
count++ count++
handleRequest(app, req, responses) s.handleRequest(req, responses)
mtx.Unlock() s.appMtx.Unlock()
} }
} }
func handleRequest(app types.Application, req *types.Request, responses chan<- *types.Response) { func (s *Server) handleRequest(req *types.Request, responses chan<- *types.Response) {
switch req.Type { switch req.Type {
case types.MessageType_Echo: case types.MessageType_Echo:
responses <- types.ResponseEcho(string(req.Data)) responses <- types.ResponseEcho(string(req.Data))
case types.MessageType_Flush: case types.MessageType_Flush:
responses <- types.ResponseFlush() responses <- types.ResponseFlush()
case types.MessageType_Info: case types.MessageType_Info:
data := app.Info() data := s.app.Info()
responses <- types.ResponseInfo(data) responses <- types.ResponseInfo(data)
case types.MessageType_SetOption: case types.MessageType_SetOption:
logStr := app.SetOption(req.Key, req.Value) logStr := s.app.SetOption(req.Key, req.Value)
responses <- types.ResponseSetOption(logStr) responses <- types.ResponseSetOption(logStr)
case types.MessageType_AppendTx: case types.MessageType_AppendTx:
code, result, logStr := app.AppendTx(req.Data) code, result, logStr := s.app.AppendTx(req.Data)
responses <- types.ResponseAppendTx(code, result, logStr) responses <- types.ResponseAppendTx(code, result, logStr)
case types.MessageType_CheckTx: case types.MessageType_CheckTx:
code, result, logStr := app.CheckTx(req.Data) code, result, logStr := s.app.CheckTx(req.Data)
responses <- types.ResponseCheckTx(code, result, logStr) responses <- types.ResponseCheckTx(code, result, logStr)
case types.MessageType_Commit: case types.MessageType_Commit:
hash, logStr := app.Commit() hash, logStr := s.app.Commit()
responses <- types.ResponseCommit(hash, logStr) responses <- types.ResponseCommit(hash, logStr)
case types.MessageType_Query: case types.MessageType_Query:
code, result, logStr := app.Query(req.Data) code, result, logStr := s.app.Query(req.Data)
responses <- types.ResponseQuery(code, result, logStr) responses <- types.ResponseQuery(code, result, logStr)
default: default:
responses <- types.ResponseException("Unknown request") responses <- types.ResponseException("Unknown request")
@ -124,7 +154,7 @@ func handleRequest(app types.Application, req *types.Request, responses chan<- *
} }
// Pull responses from 'responses' and write them to conn. // Pull responses from 'responses' and write them to conn.
func handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) { func (s *Server) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
var count int var count int
var bufWriter = bufio.NewWriter(conn) var bufWriter = bufio.NewWriter(conn)
for { for {