mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-25 23:02:16 +00:00
At larger tx sizes (e.g. > 10000) we were spending non-neglible amounts of time in tx creation, due to making the final bytes random. The slower the send loop, the less accurate it is at measuring the time tendermint took. (As we can't reach the promised contract of the given rate) There really isn't much need for that randomness, so this PR makes it such that only the txNumber gets bumped between txs from the same connection, thereby improving sendloop speed and accuracy.
285 lines
7.0 KiB
Go
285 lines
7.0 KiB
Go
package main
|
|
|
|
import (
|
|
"crypto/md5"
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/tendermint/tendermint/libs/log"
|
|
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
|
|
)
|
|
|
|
const (
|
|
sendTimeout = 10 * time.Second
|
|
// see https://github.com/tendermint/tendermint/blob/master/rpc/lib/server/handlers.go
|
|
pingPeriod = (30 * 9 / 10) * time.Second
|
|
)
|
|
|
|
type transacter struct {
|
|
Target string
|
|
Rate int
|
|
Size int
|
|
Connections int
|
|
BroadcastTxMethod string
|
|
|
|
conns []*websocket.Conn
|
|
connsBroken []bool
|
|
startingWg sync.WaitGroup
|
|
endingWg sync.WaitGroup
|
|
stopped bool
|
|
|
|
logger log.Logger
|
|
}
|
|
|
|
func newTransacter(target string, connections, rate int, size int, broadcastTxMethod string) *transacter {
|
|
return &transacter{
|
|
Target: target,
|
|
Rate: rate,
|
|
Size: size,
|
|
Connections: connections,
|
|
BroadcastTxMethod: broadcastTxMethod,
|
|
conns: make([]*websocket.Conn, connections),
|
|
connsBroken: make([]bool, connections),
|
|
logger: log.NewNopLogger(),
|
|
}
|
|
}
|
|
|
|
// SetLogger lets you set your own logger
|
|
func (t *transacter) SetLogger(l log.Logger) {
|
|
t.logger = l
|
|
}
|
|
|
|
// Start opens N = `t.Connections` connections to the target and creates read
|
|
// and write goroutines for each connection.
|
|
func (t *transacter) Start() error {
|
|
t.stopped = false
|
|
|
|
rand.Seed(time.Now().Unix())
|
|
|
|
for i := 0; i < t.Connections; i++ {
|
|
c, _, err := connect(t.Target)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t.conns[i] = c
|
|
}
|
|
|
|
t.startingWg.Add(t.Connections)
|
|
t.endingWg.Add(2 * t.Connections)
|
|
for i := 0; i < t.Connections; i++ {
|
|
go t.sendLoop(i)
|
|
go t.receiveLoop(i)
|
|
}
|
|
|
|
t.startingWg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stop closes the connections.
|
|
func (t *transacter) Stop() {
|
|
t.stopped = true
|
|
t.endingWg.Wait()
|
|
for _, c := range t.conns {
|
|
c.Close()
|
|
}
|
|
}
|
|
|
|
// receiveLoop reads messages from the connection (empty in case of
|
|
// `broadcast_tx_async`).
|
|
func (t *transacter) receiveLoop(connIndex int) {
|
|
c := t.conns[connIndex]
|
|
defer t.endingWg.Done()
|
|
for {
|
|
_, _, err := c.ReadMessage()
|
|
if err != nil {
|
|
if !websocket.IsCloseError(err, websocket.CloseNormalClosure) {
|
|
t.logger.Error(
|
|
fmt.Sprintf("failed to read response on conn %d", connIndex),
|
|
"err",
|
|
err,
|
|
)
|
|
}
|
|
return
|
|
}
|
|
if t.stopped || t.connsBroken[connIndex] {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendLoop generates transactions at a given rate.
|
|
func (t *transacter) sendLoop(connIndex int) {
|
|
started := false
|
|
// Close the starting waitgroup, in the event that this fails to start
|
|
defer func() {
|
|
if !started {
|
|
t.startingWg.Done()
|
|
}
|
|
}()
|
|
c := t.conns[connIndex]
|
|
|
|
c.SetPingHandler(func(message string) error {
|
|
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(sendTimeout))
|
|
if err == websocket.ErrCloseSent {
|
|
return nil
|
|
} else if e, ok := err.(net.Error); ok && e.Temporary() {
|
|
return nil
|
|
}
|
|
return err
|
|
})
|
|
|
|
logger := t.logger.With("addr", c.RemoteAddr())
|
|
|
|
var txNumber = 0
|
|
|
|
pingsTicker := time.NewTicker(pingPeriod)
|
|
txsTicker := time.NewTicker(1 * time.Second)
|
|
defer func() {
|
|
pingsTicker.Stop()
|
|
txsTicker.Stop()
|
|
t.endingWg.Done()
|
|
}()
|
|
|
|
// hash of the host name is a part of each tx
|
|
var hostnameHash [md5.Size]byte
|
|
hostname, err := os.Hostname()
|
|
if err != nil {
|
|
hostname = "127.0.0.1"
|
|
}
|
|
hostnameHash = md5.Sum([]byte(hostname))
|
|
// each transaction embeds connection index, tx number and hash of the hostname
|
|
// we update the tx number between successive txs
|
|
tx := generateTx(connIndex, txNumber, t.Size, hostnameHash)
|
|
txHex := make([]byte, len(tx)*2)
|
|
hex.Encode(txHex, tx)
|
|
|
|
for {
|
|
select {
|
|
case <-txsTicker.C:
|
|
startTime := time.Now()
|
|
endTime := startTime.Add(time.Second)
|
|
numTxSent := t.Rate
|
|
if !started {
|
|
t.startingWg.Done()
|
|
started = true
|
|
}
|
|
|
|
now := time.Now()
|
|
for i := 0; i < t.Rate; i++ {
|
|
// update tx number of the tx, and the corresponding hex
|
|
updateTx(tx, txHex, txNumber)
|
|
paramsJSON, err := json.Marshal(map[string]interface{}{"tx": txHex})
|
|
if err != nil {
|
|
fmt.Printf("failed to encode params: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
rawParamsJSON := json.RawMessage(paramsJSON)
|
|
|
|
c.SetWriteDeadline(now.Add(sendTimeout))
|
|
err = c.WriteJSON(rpctypes.RPCRequest{
|
|
JSONRPC: "2.0",
|
|
ID: "tm-bench",
|
|
Method: t.BroadcastTxMethod,
|
|
Params: rawParamsJSON,
|
|
})
|
|
if err != nil {
|
|
err = errors.Wrap(err,
|
|
fmt.Sprintf("txs send failed on connection #%d", connIndex))
|
|
t.connsBroken[connIndex] = true
|
|
logger.Error(err.Error())
|
|
return
|
|
}
|
|
|
|
// cache the time.Now() reads to save time.
|
|
if i%5 == 0 {
|
|
now = time.Now()
|
|
if now.After(endTime) {
|
|
// Plus one accounts for sending this tx
|
|
numTxSent = i + 1
|
|
break
|
|
}
|
|
}
|
|
|
|
txNumber++
|
|
}
|
|
|
|
timeToSend := time.Since(startTime)
|
|
logger.Info(fmt.Sprintf("sent %d transactions", numTxSent), "took", timeToSend)
|
|
if timeToSend < 1*time.Second {
|
|
sleepTime := time.Second - timeToSend
|
|
logger.Debug(fmt.Sprintf("connection #%d is sleeping for %f seconds", connIndex, sleepTime.Seconds()))
|
|
time.Sleep(sleepTime)
|
|
}
|
|
|
|
case <-pingsTicker.C:
|
|
// go-rpc server closes the connection in the absence of pings
|
|
c.SetWriteDeadline(time.Now().Add(sendTimeout))
|
|
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
|
|
err = errors.Wrap(err,
|
|
fmt.Sprintf("failed to write ping message on conn #%d", connIndex))
|
|
logger.Error(err.Error())
|
|
t.connsBroken[connIndex] = true
|
|
}
|
|
}
|
|
|
|
if t.stopped {
|
|
// To cleanly close a connection, a client should send a close
|
|
// frame and wait for the server to close the connection.
|
|
c.SetWriteDeadline(time.Now().Add(sendTimeout))
|
|
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
|
|
if err != nil {
|
|
err = errors.Wrap(err,
|
|
fmt.Sprintf("failed to write close message on conn #%d", connIndex))
|
|
logger.Error(err.Error())
|
|
t.connsBroken[connIndex] = true
|
|
}
|
|
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func connect(host string) (*websocket.Conn, *http.Response, error) {
|
|
u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"}
|
|
return websocket.DefaultDialer.Dial(u.String(), nil)
|
|
}
|
|
|
|
func generateTx(connIndex int, txNumber int, txSize int, hostnameHash [md5.Size]byte) []byte {
|
|
tx := make([]byte, txSize)
|
|
|
|
binary.PutUvarint(tx[:8], uint64(connIndex))
|
|
binary.PutUvarint(tx[8:16], uint64(txNumber))
|
|
copy(tx[16:32], hostnameHash[:16])
|
|
binary.PutUvarint(tx[32:40], uint64(time.Now().Unix()))
|
|
|
|
// 40-* random data
|
|
if _, err := rand.Read(tx[40:]); err != nil {
|
|
panic(errors.Wrap(err, "failed to read random bytes"))
|
|
}
|
|
|
|
return tx
|
|
}
|
|
|
|
// warning, mutates input byte slice
|
|
func updateTx(tx []byte, txHex []byte, txNumber int) {
|
|
binary.PutUvarint(tx[8:16], uint64(txNumber))
|
|
hexUpdate := make([]byte, 16)
|
|
hex.Encode(hexUpdate, tx[8:16])
|
|
for i := 16; i < 32; i++ {
|
|
txHex[i] = hexUpdate[i-16]
|
|
}
|
|
}
|