tendermint/server/server.go

192 lines
4.8 KiB
Go
Raw Normal View History

2015-11-02 07:39:53 -08:00
package server
import (
2015-11-08 15:18:58 -08:00
"bufio"
2015-11-02 07:39:53 -08:00
"fmt"
2015-11-30 20:56:36 -05:00
"io"
2015-11-02 07:39:53 -08:00
"net"
"strings"
"sync"
2015-11-02 07:39:53 -08:00
. "github.com/tendermint/go-common"
"github.com/tendermint/tmsp/types"
)
// var maxNumberConnections = 2
2015-11-29 03:44:08 -05:00
2016-02-21 23:44:33 -08:00
type Server struct {
QuitService
proto string
addr string
listener net.Listener
appMtx sync.Mutex
app types.Application
}
2016-05-18 00:54:32 -04:00
func NewServer(protoAddr string, app types.Application) (Service, error) {
2015-11-02 07:39:53 -08:00
parts := strings.SplitN(protoAddr, "://", 2)
proto, addr := parts[0], parts[1]
2016-02-21 23:44:33 -08:00
s := &Server{
proto: proto,
addr: addr,
listener: nil,
app: app,
}
s.QuitService = *NewQuitService(nil, "TMSPServer", s)
2016-03-20 17:38:03 -07:00
_, err := s.Start() // Just start it
return s, err
2016-02-21 23:44:33 -08:00
}
func (s *Server) OnStart() error {
s.QuitService.OnStart()
ln, err := net.Listen(s.proto, s.addr)
2015-11-02 07:39:53 -08:00
if err != nil {
2016-02-21 23:44:33 -08:00
return err
2015-11-02 07:39:53 -08:00
}
2016-02-21 23:44:33 -08:00
s.listener = ln
go s.acceptConnectionsRoutine()
return nil
}
2015-11-02 07:39:53 -08:00
2016-02-21 23:44:33 -08:00
func (s *Server) OnStop() {
s.QuitService.OnStop()
s.listener.Close()
}
2015-11-29 03:44:08 -05:00
2016-02-21 23:44:33 -08:00
func (s *Server) acceptConnectionsRoutine() {
// semaphore := make(chan struct{}, maxNumberConnections)
2015-11-29 03:44:08 -05:00
2016-02-21 23:44:33 -08:00
for {
// semaphore <- struct{}{}
2016-02-21 23:44:33 -08:00
// Accept a connection
fmt.Println("Waiting for new connection...")
conn, err := s.listener.Accept()
if err != nil {
if !s.IsRunning() {
return // Ignore error from listener closing.
}
Exit("Failed to accept connection: " + err.Error())
} else {
fmt.Println("Accepted a new connection")
2015-11-02 07:39:53 -08:00
}
2016-02-21 23:44:33 -08:00
closeConn := make(chan error, 2) // Push to signal connection closed
responses := make(chan *types.Response, 1000) // A channel to buffer responses
2015-11-02 07:39:53 -08:00
2016-02-21 23:44:33 -08:00
// Read requests from conn and deal with them
go s.handleRequests(closeConn, conn, responses)
// Pull responses from 'responses' and write them to conn.
go s.handleResponses(closeConn, responses, conn)
go func() {
// Wait until signal to close connection
errClose := <-closeConn
if errClose != nil {
fmt.Printf("Connection error: %v\n", errClose)
} else {
fmt.Println("Connection was closed.")
}
// Close the connection
err := conn.Close()
if err != nil {
fmt.Printf("Error in closing connection: %v\n", err)
}
// <-semaphore
}()
}
2015-11-02 07:39:53 -08:00
}
// Read requests from conn and deal with them
2016-02-21 23:44:33 -08:00
func (s *Server) handleRequests(closeConn chan error, conn net.Conn, responses chan<- *types.Response) {
2015-11-02 07:39:53 -08:00
var count int
2015-11-08 15:18:58 -08:00
var bufReader = bufio.NewReader(conn)
2015-11-02 07:39:53 -08:00
for {
2016-01-30 19:36:33 -08:00
var req = &types.Request{}
err := types.ReadMessage(bufReader, req)
2015-11-02 07:39:53 -08:00
if err != nil {
2015-11-30 20:56:36 -05:00
if err == io.EOF {
closeConn <- fmt.Errorf("Connection closed by client")
} else {
2016-05-17 20:06:24 -04:00
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
2015-11-30 20:56:36 -05:00
}
2015-11-02 07:39:53 -08:00
return
}
2016-02-21 23:44:33 -08:00
s.appMtx.Lock()
2015-11-02 07:39:53 -08:00
count++
2016-02-21 23:44:33 -08:00
s.handleRequest(req, responses)
s.appMtx.Unlock()
2015-11-02 07:39:53 -08:00
}
}
2016-02-21 23:44:33 -08:00
func (s *Server) handleRequest(req *types.Request, responses chan<- *types.Response) {
2016-05-17 20:06:24 -04:00
switch r := req.Value.(type) {
2016-05-14 02:22:32 -04:00
case *types.Request_Echo:
responses <- types.ToResponseEcho(r.Echo.Message)
case *types.Request_Flush:
responses <- types.ToResponseFlush()
case *types.Request_Info:
2016-02-21 23:44:33 -08:00
data := s.app.Info()
2016-05-14 02:22:32 -04:00
responses <- types.ToResponseInfo(data)
case *types.Request_SetOption:
so := r.SetOption
logStr := s.app.SetOption(so.Key, so.Value)
responses <- types.ToResponseSetOption(logStr)
case *types.Request_AppendTx:
res := s.app.AppendTx(r.AppendTx.Tx)
responses <- types.ToResponseAppendTx(res.Code, res.Data, res.Log)
case *types.Request_CheckTx:
res := s.app.CheckTx(r.CheckTx.Tx)
responses <- types.ToResponseCheckTx(res.Code, res.Data, res.Log)
case *types.Request_Commit:
2016-03-23 02:50:29 -07:00
res := s.app.Commit()
2016-05-14 02:22:32 -04:00
responses <- types.ToResponseCommit(res.Code, res.Data, res.Log)
case *types.Request_Query:
res := s.app.Query(r.Query.Query)
responses <- types.ToResponseQuery(res.Code, res.Data, res.Log)
case *types.Request_InitChain:
if app, ok := s.app.(types.BlockchainAware); ok {
2016-05-14 02:22:32 -04:00
app.InitChain(r.InitChain.Validators)
responses <- types.ToResponseInitChain()
2016-02-28 19:19:29 -08:00
} else {
2016-05-14 02:22:32 -04:00
responses <- types.ToResponseInitChain()
2016-02-28 19:19:29 -08:00
}
2016-05-14 02:22:32 -04:00
case *types.Request_EndBlock:
if app, ok := s.app.(types.BlockchainAware); ok {
2016-05-14 02:22:32 -04:00
validators := app.EndBlock(r.EndBlock.Height)
responses <- types.ToResponseEndBlock(validators)
2016-02-28 19:19:29 -08:00
} else {
2016-05-14 02:22:32 -04:00
responses <- types.ToResponseEndBlock(nil)
2016-02-28 19:19:29 -08:00
}
2015-11-02 07:39:53 -08:00
default:
2016-05-14 02:22:32 -04:00
responses <- types.ToResponseException("Unknown request")
2015-11-02 07:39:53 -08:00
}
}
// Pull responses from 'responses' and write them to conn.
2016-02-21 23:44:33 -08:00
func (s *Server) handleResponses(closeConn chan error, responses <-chan *types.Response, conn net.Conn) {
2015-11-02 07:39:53 -08:00
var count int
2015-11-08 15:18:58 -08:00
var bufWriter = bufio.NewWriter(conn)
2015-11-02 07:39:53 -08:00
for {
var res = <-responses
2016-01-30 19:36:33 -08:00
err := types.WriteMessage(res, bufWriter)
2015-11-02 07:39:53 -08:00
if err != nil {
2016-05-17 20:06:24 -04:00
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
2015-11-02 07:39:53 -08:00
return
}
2016-05-17 20:06:24 -04:00
if _, ok := res.Value.(*types.Response_Flush); ok {
2015-11-08 15:18:58 -08:00
err = bufWriter.Flush()
if err != nil {
2016-05-17 20:06:24 -04:00
closeConn <- fmt.Errorf("Error in handleValue: %v", err.Error())
2015-11-08 15:18:58 -08:00
return
}
}
2015-11-02 07:39:53 -08:00
count++
}
}