tendermint/mempool/reactor_test.go
Anton Kaliaev 5051a1f7bc
mempool: move interface into mempool package (#3524)
## Description

Refs #2659

Breaking changes in the mempool package:

[mempool] #2659 Mempool now an interface
    old Mempool renamed to CListMempool
    NewMempool renamed to NewCListMempool
    Option renamed to CListOption
    MempoolReactor renamed to Reactor
    NewMempoolReactor renamed to NewReactor
    unexpose TxID method
    TxInfo.PeerID renamed to SenderID
    unexpose MempoolReactor.Mempool

Breaking changes in the state package:

[state] #2659 Mempool interface moved to mempool package
    MockMempool moved to top-level mock package and renamed to Mempool

Non Breaking changes in the node package:

[node] #2659 Add Mempool method, which allows you to access mempool

## Commits

* move Mempool interface into mempool package

Refs #2659

Breaking changes in the mempool package:

- Mempool now an interface
- old Mempool renamed to CListMempool

Breaking changes to state package:

- MockMempool moved to mempool/mock package and renamed to Mempool
- Mempool interface moved to mempool package

* assert CListMempool impl Mempool

* gofmt code

* rename MempoolReactor to Reactor

- combine everything into one interface
- rename TxInfo.PeerID to TxInfo.SenderID
- unexpose MempoolReactor.Mempool

* move mempool mock into top-level mock package

* add a fixme

TxsFront should not be a part of the Mempool interface
because it leaks implementation details. Instead, we need to come up
with general interface for querying the mempool so the MempoolReactor
can fetch and broadcast txs to peers.

* change node#Mempool to return interface

* save commit = new reactor arch

* Revert "save commit = new reactor arch"

This reverts commit 1bfceacd9d65a720574683a7f22771e69af9af4d.

* require CListMempool in mempool.Reactor

* add two changelog entries

* fixes after my own review

* quote interfaces, structs and functions

* fixes after Ismail's review

* make node's mempool an interface

* make InitWAL/CloseWAL methods a part of Mempool interface

* fix merge conflicts

* make node's mempool an interface
2019-05-04 10:41:31 +04:00

226 lines
5.5 KiB
Go

package mempool
import (
"net"
"sync"
"testing"
"time"
"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log/term"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/tendermint/tendermint/abci/example/kvstore"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/mock"
"github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types"
)
type peerState struct {
height int64
}
func (ps peerState) GetHeight() int64 {
return ps.height
}
// mempoolLogger is a TestingLogger which uses a different
// color for each validator ("validator" key must exist).
func mempoolLogger() log.Logger {
return log.TestingLoggerWithColorFn(func(keyvals ...interface{}) term.FgBgColor {
for i := 0; i < len(keyvals)-1; i += 2 {
if keyvals[i] == "validator" {
return term.FgBgColor{Fg: term.Color(uint8(keyvals[i+1].(int) + 1))}
}
}
return term.FgBgColor{}
})
}
// connect N mempool reactors through N switches
func makeAndConnectReactors(config *cfg.Config, N int) []*Reactor {
reactors := make([]*Reactor, N)
logger := mempoolLogger()
for i := 0; i < N; i++ {
app := kvstore.NewKVStoreApplication()
cc := proxy.NewLocalClientCreator(app)
mempool, cleanup := newMempoolWithApp(cc)
defer cleanup()
reactors[i] = NewReactor(config.Mempool, mempool) // so we dont start the consensus states
reactors[i].SetLogger(logger.With("validator", i))
}
p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch {
s.AddReactor("MEMPOOL", reactors[i])
return s
}, p2p.Connect2Switches)
return reactors
}
func waitForTxsOnReactors(t *testing.T, txs types.Txs, reactors []*Reactor) {
// wait for the txs in all mempools
wg := new(sync.WaitGroup)
for i, reactor := range reactors {
wg.Add(1)
go func(r *Reactor, reactorIndex int) {
defer wg.Done()
waitForTxsOnReactor(t, txs, r, reactorIndex)
}(reactor, i)
}
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
timer := time.After(TIMEOUT)
select {
case <-timer:
t.Fatal("Timed out waiting for txs")
case <-done:
}
}
func waitForTxsOnReactor(t *testing.T, txs types.Txs, reactor *Reactor, reactorIndex int) {
mempool := reactor.mempool
for mempool.Size() < len(txs) {
time.Sleep(time.Millisecond * 100)
}
reapedTxs := mempool.ReapMaxTxs(len(txs))
for i, tx := range txs {
assert.Equalf(t, tx, reapedTxs[i],
"txs at index %d on reactor %d don't match: %v vs %v", i, reactorIndex, tx, reapedTxs[i])
}
}
// ensure no txs on reactor after some timeout
func ensureNoTxs(t *testing.T, reactor *Reactor, timeout time.Duration) {
time.Sleep(timeout) // wait for the txs in all mempools
assert.Zero(t, reactor.mempool.Size())
}
const (
NUM_TXS = 1000
TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow
)
func TestReactorBroadcastTxMessage(t *testing.T) {
config := cfg.TestConfig()
const N = 4
reactors := makeAndConnectReactors(config, N)
defer func() {
for _, r := range reactors {
r.Stop()
}
}()
for _, r := range reactors {
for _, peer := range r.Switch.Peers().List() {
peer.Set(types.PeerStateKey, peerState{1})
}
}
// send a bunch of txs to the first reactor's mempool
// and wait for them all to be received in the others
txs := checkTxs(t, reactors[0].mempool, NUM_TXS, UnknownPeerID)
waitForTxsOnReactors(t, txs, reactors)
}
func TestReactorNoBroadcastToSender(t *testing.T) {
config := cfg.TestConfig()
const N = 2
reactors := makeAndConnectReactors(config, N)
defer func() {
for _, r := range reactors {
r.Stop()
}
}()
// send a bunch of txs to the first reactor's mempool, claiming it came from peer
// ensure peer gets no txs
checkTxs(t, reactors[0].mempool, NUM_TXS, 1)
ensureNoTxs(t, reactors[1], 100*time.Millisecond)
}
func TestBroadcastTxForPeerStopsWhenPeerStops(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
config := cfg.TestConfig()
const N = 2
reactors := makeAndConnectReactors(config, N)
defer func() {
for _, r := range reactors {
r.Stop()
}
}()
// stop peer
sw := reactors[1].Switch
sw.StopPeerForError(sw.Peers().List()[0], errors.New("some reason"))
// check that we are not leaking any go-routines
// i.e. broadcastTxRoutine finishes when peer is stopped
leaktest.CheckTimeout(t, 10*time.Second)()
}
func TestBroadcastTxForPeerStopsWhenReactorStops(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
config := cfg.TestConfig()
const N = 2
reactors := makeAndConnectReactors(config, N)
// stop reactors
for _, r := range reactors {
r.Stop()
}
// check that we are not leaking any go-routines
// i.e. broadcastTxRoutine finishes when reactor is stopped
leaktest.CheckTimeout(t, 10*time.Second)()
}
func TestMempoolIDsBasic(t *testing.T) {
ids := newMempoolIDs()
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
ids.ReserveForPeer(peer)
assert.EqualValues(t, 1, ids.GetForPeer(peer))
ids.Reclaim(peer)
ids.ReserveForPeer(peer)
assert.EqualValues(t, 2, ids.GetForPeer(peer))
ids.Reclaim(peer)
}
func TestMempoolIDsPanicsIfNodeRequestsOvermaxActiveIDs(t *testing.T) {
if testing.Short() {
return
}
// 0 is already reserved for UnknownPeerID
ids := newMempoolIDs()
for i := 0; i < maxActiveIDs-1; i++ {
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
ids.ReserveForPeer(peer)
}
assert.Panics(t, func() {
peer := mock.NewPeer(net.IP{127, 0, 0, 1})
ids.ReserveForPeer(peer)
})
}