mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-28 04:01:40 +00:00
Make order be decided first; Remove TMSP Commit/Rollback
This commit is contained in:
@ -1,7 +1,6 @@
|
||||
package mempool
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"container/list"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -15,7 +14,7 @@ import (
|
||||
|
||||
/*
|
||||
|
||||
The mempool pushes new txs onto the proxyAppCtx.
|
||||
The mempool pushes new txs onto the proxyAppConn.
|
||||
It gets a stream of (req, res) tuples from the proxy.
|
||||
The memool stores good txs in a concurrent linked-list.
|
||||
|
||||
@ -24,14 +23,14 @@ safely by calling .NextWait() on each element.
|
||||
|
||||
So we have several go-routines:
|
||||
1. Consensus calling Update() and Reap() synchronously
|
||||
2. Many mempool reactor's peer routines calling AppendTx()
|
||||
2. Many mempool reactor's peer routines calling CheckTx()
|
||||
3. Many mempool reactor's peer routines traversing the txs linked list
|
||||
4. Another goroutine calling GarbageCollectTxs() periodically
|
||||
|
||||
To manage these goroutines, there are three methods of locking.
|
||||
1. Mutations to the linked-list is protected by an internal mtx (CList is goroutine-safe)
|
||||
2. Mutations to the linked-list elements are atomic
|
||||
3. AppendTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx
|
||||
3. CheckTx() calls can be paused upon Update() and Reap(), protected by .proxyMtx
|
||||
|
||||
Garbage collection of old elements from mempool.txs is handlde via
|
||||
the DetachPrev() call, which makes old elements not reachable by
|
||||
@ -42,12 +41,11 @@ peer broadcastTxRoutine() automatically garbage collected.
|
||||
const cacheSize = 100000
|
||||
|
||||
type Mempool struct {
|
||||
proxyMtx sync.Mutex
|
||||
proxyAppCtx proxy.AppContext
|
||||
txs *clist.CList // concurrent linked-list of good txs
|
||||
counter int64 // simple incrementing counter
|
||||
height int // the last block Update()'d to
|
||||
expected *clist.CElement // pointer to .txs for next response
|
||||
proxyMtx sync.Mutex
|
||||
proxyAppConn proxy.AppConn
|
||||
txs *clist.CList // concurrent linked-list of good txs
|
||||
counter int64 // simple incrementing counter
|
||||
height int // the last block Update()'d to
|
||||
|
||||
// Keep a cache of already-seen txs.
|
||||
// This reduces the pressure on the proxyApp.
|
||||
@ -55,18 +53,17 @@ type Mempool struct {
|
||||
cacheList *list.List
|
||||
}
|
||||
|
||||
func NewMempool(proxyAppCtx proxy.AppContext) *Mempool {
|
||||
func NewMempool(proxyAppConn proxy.AppConn) *Mempool {
|
||||
mempool := &Mempool{
|
||||
proxyAppCtx: proxyAppCtx,
|
||||
txs: clist.New(),
|
||||
counter: 0,
|
||||
height: 0,
|
||||
expected: nil,
|
||||
proxyAppConn: proxyAppConn,
|
||||
txs: clist.New(),
|
||||
counter: 0,
|
||||
height: 0,
|
||||
|
||||
cacheMap: make(map[string]struct{}, cacheSize),
|
||||
cacheList: list.New(),
|
||||
}
|
||||
proxyAppCtx.SetResponseCallback(mempool.resCb)
|
||||
proxyAppConn.SetResponseCallback(mempool.resCb)
|
||||
return mempool
|
||||
}
|
||||
|
||||
@ -78,7 +75,7 @@ func (mem *Mempool) TxsFrontWait() *clist.CElement {
|
||||
|
||||
// Try a new transaction in the mempool.
|
||||
// Potentially blocking if we're blocking on Update() or Reap().
|
||||
func (mem *Mempool) AppendTx(tx types.Tx) (err error) {
|
||||
func (mem *Mempool) CheckTx(tx types.Tx) (err error) {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
@ -96,70 +93,43 @@ func (mem *Mempool) AppendTx(tx types.Tx) (err error) {
|
||||
mem.cacheList.PushBack(tx)
|
||||
// END CACHE
|
||||
|
||||
if err = mem.proxyAppCtx.Error(); err != nil {
|
||||
if err = mem.proxyAppConn.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
mem.proxyAppCtx.AppendTxAsync(tx)
|
||||
mem.proxyAppConn.CheckTxAsync(tx)
|
||||
return nil
|
||||
}
|
||||
|
||||
// TMSP callback function
|
||||
// CONTRACT: No other goroutines mutate mem.expected concurrently.
|
||||
func (mem *Mempool) resCb(req tmsp.Request, res tmsp.Response) {
|
||||
switch res := res.(type) {
|
||||
case tmsp.ResponseAppendTx:
|
||||
reqAppendTx := req.(tmsp.RequestAppendTx)
|
||||
if mem.expected == nil { // Normal operation
|
||||
if res.RetCode == tmsp.RetCodeOK {
|
||||
mem.counter++
|
||||
memTx := &mempoolTx{
|
||||
counter: mem.counter,
|
||||
height: int64(mem.height),
|
||||
tx: reqAppendTx.TxBytes,
|
||||
}
|
||||
mem.txs.PushBack(memTx)
|
||||
} else {
|
||||
// ignore bad transaction
|
||||
// TODO: handle other retcodes
|
||||
case tmsp.ResponseCheckTx:
|
||||
reqCheckTx := req.(tmsp.RequestCheckTx)
|
||||
if res.RetCode == tmsp.RetCodeOK {
|
||||
mem.counter++
|
||||
memTx := &mempoolTx{
|
||||
counter: mem.counter,
|
||||
height: int64(mem.height),
|
||||
tx: reqCheckTx.TxBytes,
|
||||
}
|
||||
} else { // During Update()
|
||||
// TODO Log sane warning if mem.expected is nil.
|
||||
memTx := mem.expected.Value.(*mempoolTx)
|
||||
if !bytes.Equal(reqAppendTx.TxBytes, memTx.tx) {
|
||||
PanicSanity("Unexpected tx response from proxy")
|
||||
}
|
||||
if res.RetCode == tmsp.RetCodeOK {
|
||||
// Good, nothing to do.
|
||||
} else {
|
||||
// TODO: handle other retcodes
|
||||
// Tx became invalidated due to newly committed block.
|
||||
// NOTE: Concurrent traversal of mem.txs via CElement.Next() still works.
|
||||
mem.txs.Remove(mem.expected)
|
||||
mem.expected.DetachPrev()
|
||||
}
|
||||
mem.expected = mem.expected.Next()
|
||||
mem.txs.PushBack(memTx)
|
||||
} else {
|
||||
// ignore bad transaction
|
||||
// TODO: handle other retcodes
|
||||
}
|
||||
default:
|
||||
// ignore other messages
|
||||
}
|
||||
}
|
||||
|
||||
// Get the valid transactions run so far, and the hash of
|
||||
// the application state that results from those transactions.
|
||||
func (mem *Mempool) Reap() ([]types.Tx, []byte, error) {
|
||||
// Get the valid transactions remaining
|
||||
func (mem *Mempool) Reap() ([]types.Tx, error) {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
// First, get the hash of txs run so far
|
||||
hash, err := mem.proxyAppCtx.GetHashSync()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// And collect all the transactions.
|
||||
txs := mem.collectTxs()
|
||||
|
||||
return txs, hash, nil
|
||||
return txs, nil
|
||||
}
|
||||
|
||||
func (mem *Mempool) collectTxs() []types.Tx {
|
||||
@ -171,54 +141,29 @@ func (mem *Mempool) collectTxs() []types.Tx {
|
||||
return txs
|
||||
}
|
||||
|
||||
// "block" is the new block that was committed.
|
||||
// Txs that are present in "block" are discarded from mempool.
|
||||
// Tell mempool that these txs were committed.
|
||||
// Mempool will discard these txs.
|
||||
// NOTE: this should be called *after* block is committed by consensus.
|
||||
// CONTRACT: block is valid and next in sequence.
|
||||
func (mem *Mempool) Update(block *types.Block) error {
|
||||
func (mem *Mempool) Update(height int, txs []types.Tx) error {
|
||||
mem.proxyMtx.Lock()
|
||||
defer mem.proxyMtx.Unlock()
|
||||
|
||||
// Rollback mempool synchronously
|
||||
// TODO: test that proxyAppCtx's state matches the block's
|
||||
err := mem.proxyAppCtx.RollbackSync()
|
||||
if err != nil {
|
||||
return err
|
||||
// First, create a lookup map of txns in new txs.
|
||||
txsMap := make(map[string]struct{})
|
||||
for _, tx := range txs {
|
||||
txsMap[string(tx)] = struct{}{}
|
||||
}
|
||||
|
||||
// First, create a lookup map of txns in new block.
|
||||
blockTxsMap := make(map[string]struct{})
|
||||
for _, tx := range block.Data.Txs {
|
||||
blockTxsMap[string(tx)] = struct{}{}
|
||||
}
|
||||
// Set height
|
||||
mem.height = height
|
||||
|
||||
// Remove transactions that are already in block.
|
||||
// Return the remaining potentially good txs.
|
||||
goodTxs := mem.filterTxs(block.Height, blockTxsMap)
|
||||
|
||||
// Set height and expected
|
||||
mem.height = block.Height
|
||||
mem.expected = mem.txs.Front()
|
||||
|
||||
// Push good txs to proxyAppCtx
|
||||
// NOTE: resCb() may be called concurrently.
|
||||
for _, tx := range goodTxs {
|
||||
mem.proxyAppCtx.AppendTxAsync(tx)
|
||||
if err := mem.proxyAppCtx.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: Even though we return immediately without e.g.
|
||||
// calling mem.proxyAppCtx.FlushSync(),
|
||||
// New mempool txs will still have to wait until
|
||||
// all goodTxs are re-processed.
|
||||
// So we could make synchronous calls here to proxyAppCtx.
|
||||
// Remove transactions that are already in txs.
|
||||
mem.filterTxs(txsMap)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mem *Mempool) filterTxs(height int, blockTxsMap map[string]struct{}) []types.Tx {
|
||||
func (mem *Mempool) filterTxs(blockTxsMap map[string]struct{}) []types.Tx {
|
||||
goodTxs := make([]types.Tx, 0, mem.txs.Len())
|
||||
for e := mem.txs.Front(); e != nil; e = e.Next() {
|
||||
memTx := e.Value.(*mempoolTx)
|
||||
@ -229,7 +174,6 @@ func (mem *Mempool) filterTxs(height int, blockTxsMap map[string]struct{}) []typ
|
||||
continue
|
||||
}
|
||||
// Good tx!
|
||||
atomic.StoreInt64(&memTx.height, int64(height))
|
||||
goodTxs = append(goodTxs, memTx.tx)
|
||||
}
|
||||
return goodTxs
|
||||
|
@ -2,6 +2,7 @@ package mempool
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/tendermint/tendermint/proxy"
|
||||
@ -13,13 +14,11 @@ import (
|
||||
func TestSerialReap(t *testing.T) {
|
||||
|
||||
app := example.NewCounterApplication(true)
|
||||
appCtxMempool := app.Open()
|
||||
proxyAppCtx := proxy.NewLocalAppContext(appCtxMempool)
|
||||
mempool := NewMempool(proxyAppCtx)
|
||||
|
||||
// Create another AppContext for committing.
|
||||
appCtxConsensus := app.Open()
|
||||
appCtxConsensus.SetOption("serial", "on")
|
||||
app.SetOption("serial", "on")
|
||||
mtx := new(sync.Mutex)
|
||||
appConnMem := proxy.NewLocalAppConn(mtx, app)
|
||||
appConnCon := proxy.NewLocalAppConn(mtx, app)
|
||||
mempool := NewMempool(appConnMem)
|
||||
|
||||
appendTxsRange := func(start, end int) {
|
||||
// Append some txs.
|
||||
@ -28,24 +27,24 @@ func TestSerialReap(t *testing.T) {
|
||||
// This will succeed
|
||||
txBytes := make([]byte, 32)
|
||||
binary.LittleEndian.PutUint64(txBytes, uint64(i))
|
||||
err := mempool.AppendTx(txBytes)
|
||||
err := mempool.CheckTx(txBytes)
|
||||
if err != nil {
|
||||
t.Fatal("Error after AppendTx: %v", err)
|
||||
t.Fatal("Error after CheckTx: %v", err)
|
||||
}
|
||||
|
||||
// This will fail because not serial (incrementing)
|
||||
// However, error should still be nil.
|
||||
// It just won't show up on Reap().
|
||||
err = mempool.AppendTx(txBytes)
|
||||
err = mempool.CheckTx(txBytes)
|
||||
if err != nil {
|
||||
t.Fatal("Error after AppendTx: %v", err)
|
||||
t.Fatal("Error after CheckTx: %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
reapCheck := func(exp int) {
|
||||
txs, _, err := mempool.Reap()
|
||||
txs, err := mempool.Reap()
|
||||
if err != nil {
|
||||
t.Error("Error in mempool.Reap()", err)
|
||||
}
|
||||
@ -61,10 +60,7 @@ func TestSerialReap(t *testing.T) {
|
||||
binary.LittleEndian.PutUint64(txBytes, uint64(i))
|
||||
txs = append(txs, txBytes)
|
||||
}
|
||||
blockHeader := &types.Header{Height: 0}
|
||||
blockData := &types.Data{Txs: txs}
|
||||
block := &types.Block{Header: blockHeader, Data: blockData}
|
||||
err := mempool.Update(block)
|
||||
err := mempool.Update(0, txs)
|
||||
if err != nil {
|
||||
t.Error("Error in mempool.Update()", err)
|
||||
}
|
||||
@ -75,12 +71,12 @@ func TestSerialReap(t *testing.T) {
|
||||
for i := start; i < end; i++ {
|
||||
txBytes := make([]byte, 32)
|
||||
binary.LittleEndian.PutUint64(txBytes, uint64(i))
|
||||
_, retCode := appCtxConsensus.AppendTx(txBytes)
|
||||
_, retCode := appConnCon.AppendTx(txBytes)
|
||||
if retCode != tmsp.RetCodeOK {
|
||||
t.Error("Error committing tx", retCode)
|
||||
}
|
||||
}
|
||||
retCode := appCtxConsensus.Commit()
|
||||
_, retCode := appConnCon.GetHash()
|
||||
if retCode != tmsp.RetCodeOK {
|
||||
t.Error("Error committing range", retCode)
|
||||
}
|
||||
@ -97,7 +93,7 @@ func TestSerialReap(t *testing.T) {
|
||||
// Reap again. We should get the same amount
|
||||
reapCheck(100)
|
||||
|
||||
// Append 0 to 999, we should reap 900 txs
|
||||
// Append 0 to 999, we should reap 900 new txs
|
||||
// because 100 were already counted.
|
||||
appendTxsRange(0, 1000)
|
||||
|
||||
@ -107,11 +103,16 @@ func TestSerialReap(t *testing.T) {
|
||||
// Reap again. We should get the same amount
|
||||
reapCheck(1000)
|
||||
|
||||
// Commit from the conensus AppContext
|
||||
// Commit from the conensus AppConn
|
||||
commitRange(0, 500)
|
||||
updateRange(0, 500)
|
||||
|
||||
// We should have 500 left.
|
||||
reapCheck(500)
|
||||
|
||||
// Append 100 invalid txs and 100 valid txs
|
||||
appendTxsRange(900, 1100)
|
||||
|
||||
// We should have 600 now.
|
||||
reapCheck(600)
|
||||
}
|
||||
|
@ -67,7 +67,7 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
|
||||
|
||||
switch msg := msg.(type) {
|
||||
case *TxMessage:
|
||||
err := memR.Mempool.AppendTx(msg.Tx)
|
||||
err := memR.Mempool.CheckTx(msg.Tx)
|
||||
if err != nil {
|
||||
// Bad, seen, or conflicting tx.
|
||||
log.Info("Could not add tx", "tx", msg.Tx)
|
||||
@ -81,9 +81,9 @@ func (memR *MempoolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
// Just an alias for AppendTx since broadcasting happens in peer routines
|
||||
// Just an alias for CheckTx since broadcasting happens in peer routines
|
||||
func (memR *MempoolReactor) BroadcastTx(tx types.Tx) error {
|
||||
return memR.Mempool.AppendTx(tx)
|
||||
return memR.Mempool.CheckTx(tx)
|
||||
}
|
||||
|
||||
type PeerState interface {
|
||||
|
Reference in New Issue
Block a user