tendermint/tm-bench/transacter.go

108 lines
2.0 KiB
Go
Raw Normal View History

package main
2017-03-13 19:10:51 +04:00
import (
"encoding/binary"
"encoding/hex"
"fmt"
2017-03-17 14:52:14 +04:00
"math/rand"
2017-03-22 15:53:30 +04:00
"os"
2017-03-13 19:10:51 +04:00
"sync"
"time"
"github.com/pkg/errors"
rpcclient "github.com/tendermint/go-rpc/client"
rpctypes "github.com/tendermint/go-rpc/types"
)
type transacter struct {
2017-03-17 13:13:06 +04:00
Target string
Rate int
Connections int
2017-03-13 19:10:51 +04:00
2017-03-17 13:13:06 +04:00
conns []*rpcclient.WSClient
2017-03-13 19:10:51 +04:00
wg sync.WaitGroup
2017-03-17 13:13:06 +04:00
stopped bool
2017-03-13 19:10:51 +04:00
}
2017-03-17 13:13:06 +04:00
func newTransacter(target string, connections int, rate int) *transacter {
conns := make([]*rpcclient.WSClient, connections)
for i := 0; i < connections; i++ {
conns[i] = rpcclient.NewWSClient(target, "/websocket")
}
return &transacter{
2017-03-17 13:13:06 +04:00
Target: target,
Rate: rate,
Connections: connections,
conns: conns,
}
}
2017-03-13 19:10:51 +04:00
func (t *transacter) Start() error {
t.stopped = false
2017-03-17 13:13:06 +04:00
for _, c := range t.conns {
if _, err := c.Start(); err != nil {
return err
}
}
2017-03-22 15:53:30 +04:00
t.wg.Add(t.Connections)
2017-03-17 13:13:06 +04:00
for i := 0; i < t.Connections; i++ {
go t.sendLoop(i)
2017-03-13 19:10:51 +04:00
}
2017-03-17 13:13:06 +04:00
2017-03-13 19:10:51 +04:00
return nil
}
func (t *transacter) Stop() {
t.stopped = true
t.wg.Wait()
2017-03-17 13:13:06 +04:00
for _, c := range t.conns {
c.Stop()
}
2017-03-13 19:10:51 +04:00
}
2017-03-17 13:13:06 +04:00
func (t *transacter) sendLoop(connIndex int) {
conn := t.conns[connIndex]
2017-03-13 19:10:51 +04:00
2017-03-17 13:13:06 +04:00
var num = 0
2017-03-13 19:10:51 +04:00
for {
startTime := time.Now()
2017-03-17 13:13:06 +04:00
2017-03-13 19:10:51 +04:00
for i := 0; i < t.Rate; i++ {
2017-03-17 13:13:06 +04:00
if t.stopped {
t.wg.Done()
return
}
tx := generateTx(connIndex, num)
err := conn.WriteJSON(rpctypes.RPCRequest{
2017-03-13 19:10:51 +04:00
JSONRPC: "2.0",
ID: "",
Method: "broadcast_tx_async",
Params: []interface{}{hex.EncodeToString(tx)},
})
if err != nil {
2017-03-22 15:53:30 +04:00
fmt.Printf("Lost connection to %s. Please restart the test.\nDetails:\n%v", conn.Address, err)
os.Exit(1)
2017-03-13 19:10:51 +04:00
}
num++
}
timeToSend := time.Now().Sub(startTime)
time.Sleep(time.Second - timeToSend)
2017-03-13 19:10:51 +04:00
}
}
2017-03-17 13:13:06 +04:00
func generateTx(a int, b int) []byte {
2017-03-13 19:10:51 +04:00
tx := make([]byte, 250)
2017-03-17 13:13:06 +04:00
binary.PutUvarint(tx[:32], uint64(a))
binary.PutUvarint(tx[32:64], uint64(b))
2017-03-13 19:10:51 +04:00
if _, err := rand.Read(tx[234:]); err != nil {
panic(errors.Wrap(err, "err reading from crypto/rand"))
}
return tx
}