tendermint/tm-bench/transacter.go
Dev Ojha 48e1227f92 tmbench: Make sendloop act in one second segments (#110)
* tmbench: Make sendloop act in one second segments

Previous behaviour was to make the sendloop send all its messages
and then time how long that took. (Possibly waiting if too fast)
This has the same operation when too fast, but stops the loop
after one second. This is useful if a large tx amount is specified
and the ping doesn't get executed. Tmbench no longer crashes on
large rates.

* Update Readme
2018-06-28 12:11:05 +04:00

236 lines
5.7 KiB
Go

package main
import (
"crypto/md5"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
"github.com/tendermint/tmlibs/log"
)
const (
sendTimeout = 10 * time.Second
// see https://github.com/tendermint/go-rpc/blob/develop/server/handlers.go#L313
pingPeriod = (30 * 9 / 10) * time.Second
)
type transacter struct {
Target string
Rate int
Size int
Connections int
BroadcastTxMethod string
conns []*websocket.Conn
wg sync.WaitGroup
stopped bool
logger log.Logger
}
func newTransacter(target string, connections, rate int, size int, broadcastTxMethod string) *transacter {
return &transacter{
Target: target,
Rate: rate,
Size: size,
Connections: connections,
BroadcastTxMethod: broadcastTxMethod,
conns: make([]*websocket.Conn, connections),
logger: log.NewNopLogger(),
}
}
// SetLogger lets you set your own logger
func (t *transacter) SetLogger(l log.Logger) {
t.logger = l
}
// Start opens N = `t.Connections` connections to the target and creates read
// and write goroutines for each connection.
func (t *transacter) Start() error {
t.stopped = false
rand.Seed(time.Now().Unix())
for i := 0; i < t.Connections; i++ {
c, _, err := connect(t.Target)
if err != nil {
return err
}
t.conns[i] = c
}
t.wg.Add(2 * t.Connections)
for i := 0; i < t.Connections; i++ {
go t.sendLoop(i)
go t.receiveLoop(i)
}
return nil
}
// Stop closes the connections.
func (t *transacter) Stop() {
t.stopped = true
t.wg.Wait()
for _, c := range t.conns {
c.Close()
}
}
// receiveLoop reads messages from the connection (empty in case of
// `broadcast_tx_async`).
func (t *transacter) receiveLoop(connIndex int) {
c := t.conns[connIndex]
defer t.wg.Done()
for {
_, _, err := c.ReadMessage()
if err != nil {
if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
t.logger.Error("failed to read response", "err", err)
}
return
}
if t.stopped {
return
}
}
}
// sendLoop generates transactions at a given rate.
func (t *transacter) sendLoop(connIndex int) {
c := t.conns[connIndex]
c.SetPingHandler(func(message string) error {
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(sendTimeout))
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
return err
})
logger := t.logger.With("addr", c.RemoteAddr())
var txNumber = 0
pingsTicker := time.NewTicker(pingPeriod)
txsTicker := time.NewTicker(1 * time.Second)
defer func() {
pingsTicker.Stop()
txsTicker.Stop()
t.wg.Done()
}()
// hash of the host name is a part of each tx
var hostnameHash [md5.Size]byte
hostname, err := os.Hostname()
if err != nil {
hostname = "127.0.0.1"
}
hostnameHash = md5.Sum([]byte(hostname))
for {
select {
case <-txsTicker.C:
startTime := time.Now()
endTime := startTime.Add(time.Second)
numTxSent := t.Rate
for i := 0; i < t.Rate; i++ {
// each transaction embeds connection index, tx number and hash of the hostname
tx := generateTx(connIndex, txNumber, t.Size, hostnameHash)
paramsJSON, err := json.Marshal(map[string]interface{}{"tx": hex.EncodeToString(tx)})
if err != nil {
fmt.Printf("failed to encode params: %v\n", err)
os.Exit(1)
}
rawParamsJSON := json.RawMessage(paramsJSON)
c.SetWriteDeadline(time.Now().Add(sendTimeout))
err = c.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "tm-bench",
Method: t.BroadcastTxMethod,
Params: rawParamsJSON,
})
if err != nil {
fmt.Fprintf(os.Stderr, "%v. Try reducing the connections count and increasing the rate.\n", errors.Wrap(err, "txs send failed"))
os.Exit(1)
}
// Time added here is 7.13 ns/op, not significant enough to worry about
if i%20 == 0 {
if time.Now().After(endTime) {
// Plus one accounts for sending this tx
numTxSent = i + 1
break
}
}
txNumber++
}
timeToSend := time.Now().Sub(startTime)
if timeToSend < 1*time.Second {
time.Sleep(time.Second - timeToSend)
}
logger.Info(fmt.Sprintf("sent %d transactions", numTxSent), "took", timeToSend)
case <-pingsTicker.C:
// go-rpc server closes the connection in the absence of pings
c.SetWriteDeadline(time.Now().Add(sendTimeout))
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
logger.Error("failed to write ping message", "err", err)
}
}
if t.stopped {
// To cleanly close a connection, a client should send a close
// frame and wait for the server to close the connection.
c.SetWriteDeadline(time.Now().Add(sendTimeout))
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
logger.Error("failed to write close message", "err", err)
}
return
}
}
}
func connect(host string) (*websocket.Conn, *http.Response, error) {
u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"}
return websocket.DefaultDialer.Dial(u.String(), nil)
}
func generateTx(connIndex int, txNumber int, txSize int, hostnameHash [md5.Size]byte) []byte {
tx := make([]byte, txSize)
binary.PutUvarint(tx[:8], uint64(connIndex))
binary.PutUvarint(tx[8:16], uint64(txNumber))
copy(tx[16:32], hostnameHash[:16])
binary.PutUvarint(tx[32:40], uint64(time.Now().Unix()))
// 40-* random data
if _, err := rand.Read(tx[40:]); err != nil {
panic(errors.Wrap(err, "failed to read random bytes"))
}
return tx
}