rpc: broadcast tests. closes #219

This commit is contained in:
Ethan Buchman 2016-07-05 14:41:50 -04:00
parent 65ebc344ac
commit 7e3e9ee9d2
4 changed files with 97 additions and 26 deletions

View File

@ -60,7 +60,7 @@ type Mempool struct {
// Keep a cache of already-seen txs. // Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp. // This reduces the pressure on the proxyApp.
cacheMap map[string]struct{} cacheMap map[string]struct{}
cacheList *list.List cacheList *list.List // to remove oldest tx when cache gets too big
} }
func NewMempool(config cfg.Config, proxyAppConn proxy.AppConn) *Mempool { func NewMempool(config cfg.Config, proxyAppConn proxy.AppConn) *Mempool {
@ -81,6 +81,7 @@ func NewMempool(config cfg.Config, proxyAppConn proxy.AppConn) *Mempool {
return mempool return mempool
} }
// consensus must be able to hold lock to safely update
func (mem *Mempool) Lock() { func (mem *Mempool) Lock() {
mem.proxyMtx.Lock() mem.proxyMtx.Lock()
} }
@ -89,10 +90,25 @@ func (mem *Mempool) Unlock() {
mem.proxyMtx.Unlock() mem.proxyMtx.Unlock()
} }
// Number of transactions in the mempool clist
func (mem *Mempool) Size() int { func (mem *Mempool) Size() int {
return mem.txs.Len() return mem.txs.Len()
} }
// Remove all transactions from mempool and cache
func (mem *Mempool) Flush() {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
mem.cacheMap = make(map[string]struct{}, cacheSize)
mem.cacheList.Init()
for e := mem.txs.Front(); e != nil; e = e.Next() {
mem.txs.Remove(e)
e.DetachPrev()
}
}
// Return the first element of mem.txs for peer goroutines to call .NextWait() on. // Return the first element of mem.txs for peer goroutines to call .NextWait() on.
// Blocks until txs has elements. // Blocks until txs has elements.
func (mem *Mempool) TxsFrontWait() *clist.CElement { func (mem *Mempool) TxsFrontWait() *clist.CElement {
@ -125,6 +141,8 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) {
if mem.cacheList.Len() >= cacheSize { if mem.cacheList.Len() >= cacheSize {
popped := mem.cacheList.Front() popped := mem.cacheList.Front()
poppedTx := popped.Value.(types.Tx) poppedTx := popped.Value.(types.Tx)
// NOTE: the tx may have already been removed from the map
// but deleting a non-existant element is fine
delete(mem.cacheMap, string(poppedTx)) delete(mem.cacheMap, string(poppedTx))
mem.cacheList.Remove(popped) mem.cacheList.Remove(popped)
} }
@ -146,6 +164,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*tmsp.Response)) (err error) {
func (mem *Mempool) removeTxFromCacheMap(tx []byte) { func (mem *Mempool) removeTxFromCacheMap(tx []byte) {
mem.proxyMtx.Lock() mem.proxyMtx.Lock()
// NOTE tx not removed from cacheList
delete(mem.cacheMap, string(tx)) delete(mem.cacheMap, string(tx))
mem.proxyMtx.Unlock() mem.proxyMtx.Unlock()

View File

@ -46,7 +46,7 @@ func BroadcastTxSync(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {
// Else, block until the tx is included in a block, // Else, block until the tx is included in a block,
// and return the result of AppendTx (with no error). // and return the result of AppendTx (with no error).
// Even if AppendTx fails, so long as the tx is included in a block this function // Even if AppendTx fails, so long as the tx is included in a block this function
// will not return an error. // will not return an error - it is the caller's responsibility to check res.Code.
// The function times out after five minutes and returns the result of CheckTx and an error. // The function times out after five minutes and returns the result of CheckTx and an error.
// TODO: smarter timeout logic or someway to cancel (tx not getting committed is a sign of a larger problem!) // TODO: smarter timeout logic or someway to cancel (tx not getting committed is a sign of a larger problem!)
func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) { func BroadcastTxCommit(tx types.Tx) (*ctypes.ResultBroadcastTx, error) {

View File

@ -1,11 +1,13 @@
package rpctest package rpctest
import ( import (
"bytes"
"fmt" "fmt"
"testing" "testing"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
tmsp "github.com/tendermint/tmsp/types"
) )
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
@ -42,33 +44,84 @@ func testStatus(t *testing.T, statusI interface{}) {
} }
} }
// TODO //--------------------------------------------------------------------------------
/* // broadcast tx sync
func testBroadcastTx(t *testing.T, typ string) {
amt := int64(100) var testTx = []byte{0x1, 0x2, 0x3, 0x4, 0x5}
toAddr := user[1].Address
tx := makeDefaultSendTxSigned(t, typ, toAddr, amt) func TestURIBroadcastTxSync(t *testing.T) {
receipt := broadcastTx(t, typ, tx) config.Set("block_size", 0)
if receipt.CreatesContract > 0 { defer config.Set("block_size", -1)
t.Fatal("This tx does not create a contract") tmResult := new(ctypes.TMResult)
_, err := clientURI.Call("broadcast_tx_sync", map[string]interface{}{"tx": testTx}, tmResult)
if err != nil {
t.Fatal(err)
} }
if len(receipt.TxHash) == 0 { testBroadcastTxSync(t, tmResult)
t.Fatal("Failed to compute tx hash")
} }
pool := node.MempoolReactor().Mempool
txs := pool.GetProposalTxs() func TestJSONBroadcastTxSync(t *testing.T) {
if len(txs) != mempoolCount { config.Set("block_size", 0)
t.Fatalf("The mem pool has %d txs. Expected %d", len(txs), mempoolCount) defer config.Set("block_size", -1)
tmResult := new(ctypes.TMResult)
_, err := clientJSON.Call("broadcast_tx_sync", []interface{}{testTx}, tmResult)
if err != nil {
t.Fatal(err)
}
testBroadcastTxSync(t, tmResult)
}
func testBroadcastTxSync(t *testing.T, resI interface{}) {
tmRes := resI.(*ctypes.TMResult)
res := (*tmRes).(*ctypes.ResultBroadcastTx)
if res.Code != tmsp.CodeType_OK {
t.Fatalf("BroadcastTxSync got non-zero exit code: %v. %X; %s", res.Code, res.Data, res.Log)
}
mem := node.MempoolReactor().Mempool
if mem.Size() != 1 {
t.Fatalf("Mempool size should have been 1. Got %d", mem.Size())
}
txs := mem.Reap(1)
if !bytes.Equal(txs[0], testTx) {
t.Fatalf("Tx in mempool does not match test tx. Got %X, expected %X", txs[0], testTx)
}
mem.Flush()
}
//--------------------------------------------------------------------------------
// broadcast tx commit
func TestURIBroadcastTxCommit(t *testing.T) {
tmResult := new(ctypes.TMResult)
_, err := clientURI.Call("broadcast_tx_commit", map[string]interface{}{"tx": testTx}, tmResult)
if err != nil {
t.Fatal(err)
}
testBroadcastTxCommit(t, tmResult)
}
func TestJSONBroadcastTxCommit(t *testing.T) {
tmResult := new(ctypes.TMResult)
_, err := clientJSON.Call("broadcast_tx_commit", []interface{}{testTx}, tmResult)
if err != nil {
t.Fatal(err)
}
testBroadcastTxCommit(t, tmResult)
}
func testBroadcastTxCommit(t *testing.T, resI interface{}) {
tmRes := resI.(*ctypes.TMResult)
res := (*tmRes).(*ctypes.ResultBroadcastTx)
if res.Code != tmsp.CodeType_OK {
t.Fatalf("BroadcastTxCommit got non-zero exit code: %v. %X; %s", res.Code, res.Data, res.Log)
}
mem := node.MempoolReactor().Mempool
if mem.Size() != 0 {
t.Fatalf("Mempool size should have been 0. Got %s", mem.Size())
} }
tx2 := txs[mempoolCount-1].(*types.SendTx)
n, err := new(int64), new(error)
buf1, buf2 := new(bytes.Buffer), new(bytes.Buffer)
tx.WriteSignBytes(chainID, buf1, n, err)
tx2.WriteSignBytes(chainID, buf2, n, err)
if bytes.Compare(buf1.Bytes(), buf2.Bytes()) != 0 {
t.Fatal("inconsistent hashes for mempool tx and sent tx")
} }
}*/
//-------------------------------------------------------------------------------- //--------------------------------------------------------------------------------
// Test the websocket service // Test the websocket service

View File

@ -20,7 +20,6 @@ import (
var ( var (
config cfg.Config config cfg.Config
node *nm.Node node *nm.Node
mempoolCount = 0
chainID string chainID string
rpcAddr string rpcAddr string
requestAddr string requestAddr string