txsMap is sync.Map

This commit is contained in:
Ethan Buchman 2019-03-30 15:01:27 -04:00
parent bcd03800ff
commit 5f469c692d
No known key found for this signature in database
GPG Key ID: DBB0B3EC64A4BDAA

View File

@ -177,9 +177,9 @@ type Mempool struct {
notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
// map for quick access to txs
// Used in CheckTx to record the tx sender.
txsMap map[[sha256.Size]byte]*clist.CElement
// map for quick access to txs to record sender in CheckTx.
// txsMap: txKey -> CElement
txsMap sync.Map
// Atomic integers
height int64 // the last block Update()'d to
@ -212,7 +212,6 @@ func NewMempool(
config: config,
proxyAppConn: proxyAppConn,
txs: clist.New(),
txsMap: make(map[[sha256.Size]byte]*clist.CElement),
height: height,
rechecking: 0,
recheckCursor: nil,
@ -328,7 +327,7 @@ func (mem *Mempool) Flush() {
e.DetachPrev()
}
mem.txsMap = make(map[[sha256.Size]byte]*clist.CElement)
mem.txsMap = sync.Map{}
_ = atomic.SwapInt64(&mem.txsBytes, 0)
}
@ -393,8 +392,8 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo
// Note it's possible a tx is still in the cache but no longer in the mempool
// (eg. after committing a block, txs are removed from mempool but not cache),
// so we only record the sender for txs still in the mempool.
if e, ok := mem.txsMap[txKey(tx)]; ok {
memTx := e.Value.(*mempoolTx)
if e, ok := mem.txsMap.Load(txKey(tx)); ok {
memTx := e.(*clist.CElement).Value.(*mempoolTx)
if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded {
// TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid,
@ -485,7 +484,7 @@ func (mem *Mempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Res
// - resCbFirstTime (lock not held) if tx is valid
func (mem *Mempool) addTx(memTx *mempoolTx) {
e := mem.txs.PushBack(memTx)
mem.txsMap[txKey(memTx.tx)] = e
mem.txsMap.Store(txKey(memTx.tx), e)
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
}
@ -496,7 +495,7 @@ func (mem *Mempool) addTx(memTx *mempoolTx) {
func (mem *Mempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) {
mem.txs.Remove(elem)
elem.DetachPrev()
delete(mem.txsMap, txKey(tx))
mem.txsMap.Delete(txKey(tx))
atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))
if removeFromCache {
@ -768,8 +767,11 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) {
type mempoolTx struct {
height int64 // height that this tx had been validated in
gasWanted int64 // amount of gas this tx states it will require
senders sync.Map // ids of peers who've sent us this tx (as a map for quick lookups)
tx types.Tx //
// ids of peers who've sent us this tx (as a map for quick lookups).
// senders: PeerID -> bool
senders sync.Map
}
// Height returns the height for this transaction