tendermint/tools/tm-bench/transacter.go
ValarDragon 1dbe7b7e68 tools/tmbench: Improve accuracy with large tx sizes.
At larger tx sizes (e.g. > 10000) we were spending non-neglible
amounts of time in tx creation, due to making the final bytes random.
The slower the send loop, the less accurate it is at measuring the time
tendermint took. (As we can't reach the promised contract of the given rate)

There really isn't much need for that randomness, so this PR makes it
such that only the txNumber gets bumped between txs from the same
connection, thereby improving sendloop speed and accuracy.
2018-07-12 23:47:40 -07:00

285 lines
7.0 KiB
Go

package main
import (
"crypto/md5"
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"math/rand"
"net"
"net/http"
"net/url"
"os"
"sync"
"time"
"github.com/gorilla/websocket"
"github.com/pkg/errors"
"github.com/tendermint/tendermint/libs/log"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
)
const (
sendTimeout = 10 * time.Second
// see https://github.com/tendermint/tendermint/blob/master/rpc/lib/server/handlers.go
pingPeriod = (30 * 9 / 10) * time.Second
)
type transacter struct {
Target string
Rate int
Size int
Connections int
BroadcastTxMethod string
conns []*websocket.Conn
connsBroken []bool
startingWg sync.WaitGroup
endingWg sync.WaitGroup
stopped bool
logger log.Logger
}
func newTransacter(target string, connections, rate int, size int, broadcastTxMethod string) *transacter {
return &transacter{
Target: target,
Rate: rate,
Size: size,
Connections: connections,
BroadcastTxMethod: broadcastTxMethod,
conns: make([]*websocket.Conn, connections),
connsBroken: make([]bool, connections),
logger: log.NewNopLogger(),
}
}
// SetLogger lets you set your own logger
func (t *transacter) SetLogger(l log.Logger) {
t.logger = l
}
// Start opens N = `t.Connections` connections to the target and creates read
// and write goroutines for each connection.
func (t *transacter) Start() error {
t.stopped = false
rand.Seed(time.Now().Unix())
for i := 0; i < t.Connections; i++ {
c, _, err := connect(t.Target)
if err != nil {
return err
}
t.conns[i] = c
}
t.startingWg.Add(t.Connections)
t.endingWg.Add(2 * t.Connections)
for i := 0; i < t.Connections; i++ {
go t.sendLoop(i)
go t.receiveLoop(i)
}
t.startingWg.Wait()
return nil
}
// Stop closes the connections.
func (t *transacter) Stop() {
t.stopped = true
t.endingWg.Wait()
for _, c := range t.conns {
c.Close()
}
}
// receiveLoop reads messages from the connection (empty in case of
// `broadcast_tx_async`).
func (t *transacter) receiveLoop(connIndex int) {
c := t.conns[connIndex]
defer t.endingWg.Done()
for {
_, _, err := c.ReadMessage()
if err != nil {
if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
t.logger.Error(
fmt.Sprintf("failed to read response on conn %d", connIndex),
"err",
err,
)
}
return
}
if t.stopped || t.connsBroken[connIndex] {
return
}
}
}
// sendLoop generates transactions at a given rate.
func (t *transacter) sendLoop(connIndex int) {
started := false
// Close the starting waitgroup, in the event that this fails to start
defer func() {
if !started {
t.startingWg.Done()
}
}()
c := t.conns[connIndex]
c.SetPingHandler(func(message string) error {
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(sendTimeout))
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
return err
})
logger := t.logger.With("addr", c.RemoteAddr())
var txNumber = 0
pingsTicker := time.NewTicker(pingPeriod)
txsTicker := time.NewTicker(1 * time.Second)
defer func() {
pingsTicker.Stop()
txsTicker.Stop()
t.endingWg.Done()
}()
// hash of the host name is a part of each tx
var hostnameHash [md5.Size]byte
hostname, err := os.Hostname()
if err != nil {
hostname = "127.0.0.1"
}
hostnameHash = md5.Sum([]byte(hostname))
// each transaction embeds connection index, tx number and hash of the hostname
// we update the tx number between successive txs
tx := generateTx(connIndex, txNumber, t.Size, hostnameHash)
txHex := make([]byte, len(tx)*2)
hex.Encode(txHex, tx)
for {
select {
case <-txsTicker.C:
startTime := time.Now()
endTime := startTime.Add(time.Second)
numTxSent := t.Rate
if !started {
t.startingWg.Done()
started = true
}
now := time.Now()
for i := 0; i < t.Rate; i++ {
// update tx number of the tx, and the corresponding hex
updateTx(tx, txHex, txNumber)
paramsJSON, err := json.Marshal(map[string]interface{}{"tx": txHex})
if err != nil {
fmt.Printf("failed to encode params: %v\n", err)
os.Exit(1)
}
rawParamsJSON := json.RawMessage(paramsJSON)
c.SetWriteDeadline(now.Add(sendTimeout))
err = c.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "tm-bench",
Method: t.BroadcastTxMethod,
Params: rawParamsJSON,
})
if err != nil {
err = errors.Wrap(err,
fmt.Sprintf("txs send failed on connection #%d", connIndex))
t.connsBroken[connIndex] = true
logger.Error(err.Error())
return
}
// cache the time.Now() reads to save time.
if i%5 == 0 {
now = time.Now()
if now.After(endTime) {
// Plus one accounts for sending this tx
numTxSent = i + 1
break
}
}
txNumber++
}
timeToSend := time.Since(startTime)
logger.Info(fmt.Sprintf("sent %d transactions", numTxSent), "took", timeToSend)
if timeToSend < 1*time.Second {
sleepTime := time.Second - timeToSend
logger.Debug(fmt.Sprintf("connection #%d is sleeping for %f seconds", connIndex, sleepTime.Seconds()))
time.Sleep(sleepTime)
}
case <-pingsTicker.C:
// go-rpc server closes the connection in the absence of pings
c.SetWriteDeadline(time.Now().Add(sendTimeout))
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
err = errors.Wrap(err,
fmt.Sprintf("failed to write ping message on conn #%d", connIndex))
logger.Error(err.Error())
t.connsBroken[connIndex] = true
}
}
if t.stopped {
// To cleanly close a connection, a client should send a close
// frame and wait for the server to close the connection.
c.SetWriteDeadline(time.Now().Add(sendTimeout))
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
err = errors.Wrap(err,
fmt.Sprintf("failed to write close message on conn #%d", connIndex))
logger.Error(err.Error())
t.connsBroken[connIndex] = true
}
return
}
}
}
func connect(host string) (*websocket.Conn, *http.Response, error) {
u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"}
return websocket.DefaultDialer.Dial(u.String(), nil)
}
func generateTx(connIndex int, txNumber int, txSize int, hostnameHash [md5.Size]byte) []byte {
tx := make([]byte, txSize)
binary.PutUvarint(tx[:8], uint64(connIndex))
binary.PutUvarint(tx[8:16], uint64(txNumber))
copy(tx[16:32], hostnameHash[:16])
binary.PutUvarint(tx[32:40], uint64(time.Now().Unix()))
// 40-* random data
if _, err := rand.Read(tx[40:]); err != nil {
panic(errors.Wrap(err, "failed to read random bytes"))
}
return tx
}
// warning, mutates input byte slice
func updateTx(tx []byte, txHex []byte, txNumber int) {
binary.PutUvarint(tx[8:16], uint64(txNumber))
hexUpdate := make([]byte, 16)
hex.Encode(hexUpdate, tx[8:16])
for i := 16; i < 32; i++ {
txHex[i] = hexUpdate[i-16]
}
}