package pubsub_test

import (
	"context"
	"fmt"
	"runtime/debug"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"

	"github.com/tendermint/tendermint/libs/log"

	"github.com/tendermint/tendermint/libs/pubsub"
	"github.com/tendermint/tendermint/libs/pubsub/query"
)

const (
	clientID = "test-client"
)

func TestSubscribe(t *testing.T) {
	s := pubsub.NewServer()
	s.SetLogger(log.TestingLogger())
	s.Start()
	defer s.Stop()

	ctx := context.Background()
	subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
	require.NoError(t, err)

	assert.Equal(t, 1, s.NumClients())
	assert.Equal(t, 1, s.NumClientSubscriptions(clientID))

	err = s.Publish(ctx, "Ka-Zar")
	require.NoError(t, err)
	assertReceive(t, "Ka-Zar", subscription.Out())

	published := make(chan struct{})
	go func() {
		defer close(published)

		err := s.Publish(ctx, "Quicksilver")
		assert.NoError(t, err)

		err = s.Publish(ctx, "Asylum")
		assert.NoError(t, err)

		err = s.Publish(ctx, "Ivan")
		assert.NoError(t, err)
	}()

	select {
	case <-published:
		assertReceive(t, "Quicksilver", subscription.Out())
		assertCancelled(t, subscription, pubsub.ErrOutOfCapacity)
	case <-time.After(3 * time.Second):
		t.Fatal("Expected Publish(Asylum) not to block")
	}
}

func TestSubscribeWithCapacity(t *testing.T) {
	s := pubsub.NewServer()
	s.SetLogger(log.TestingLogger())
	s.Start()
	defer s.Stop()

	ctx := context.Background()
	assert.Panics(t, func() {
		s.Subscribe(ctx, clientID, query.Empty{}, -1)
	})
	assert.Panics(t, func() {
		s.Subscribe(ctx, clientID, query.Empty{}, 0)
	})
	subscription, err := s.Subscribe(ctx, clientID, query.Empty{}, 1)
	require.NoError(t, err)
	err = s.Publish(ctx, "Aggamon")
	require.NoError(t, err)
	assertReceive(t, "Aggamon", subscription.Out())
}

func TestSubscribeUnbuffered(t *testing.T) {
	s := pubsub.NewServer()
	s.SetLogger(log.TestingLogger())
	s.Start()
	defer s.Stop()

	ctx := context.Background()
	subscription, err := s.SubscribeUnbuffered(ctx, clientID, query.Empty{})
	require.NoError(t, err)

	published := make(chan struct{})
	go func() {
		defer close(published)

		err := s.Publish(ctx, "Ultron")
		assert.NoError(t, err)

		err = s.Publish(ctx, "Darkhawk")
		assert.NoError(t, err)
	}()

	select {
	case <-published:
		t.Fatal("Expected Publish(Darkhawk) to block")
	case <-time.After(3 * time.Second):
		assertReceive(t, "Ultron", subscription.Out())
		assertReceive(t, "Darkhawk", subscription.Out())
	}
}

func TestSlowClientIsRemovedWithErrOutOfCapacity(t *testing.T) {
	s := pubsub.NewServer()
	s.SetLogger(log.TestingLogger())
	s.Start()
	defer s.Stop()

	ctx := context.Background()
	subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
	require.NoError(t, err)
	err = s.Publish(ctx, "Fat Cobra")
	require.NoError(t, err)
	err = s.Publish(ctx, "Viper")
	require.NoError(t, err)

	assertCancelled(t, subscription, pubsub.ErrOutOfCapacity)
}

func TestDifferentClients(t *testing.T) {
	s := pubsub.NewServer()
	s.SetLogger(log.TestingLogger())
	s.Start()
	defer s.Stop()

	ctx := context.Background()
	subscription1, err := s.Subscribe(ctx, "client-1", query.MustParse("tm.events.type='NewBlock'"))
	require.NoError(t, err)
	err = s.PublishWithEvents(ctx, "Iceman", map[string][]string{"tm.events.type": {"NewBlock"}})
	require.NoError(t, err)
	assertReceive(t, "Iceman", subscription1.Out())

	subscription2, err := s.Subscribe(ctx, "client-2", query.MustParse("tm.events.type='NewBlock' AND abci.account.name='Igor'"))
	require.NoError(t, err)
	err = s.PublishWithEvents(ctx, "Ultimo", map[string][]string{"tm.events.type": {"NewBlock"}, "abci.account.name": {"Igor"}})
	require.NoError(t, err)
	assertReceive(t, "Ultimo", subscription1.Out())
	assertReceive(t, "Ultimo", subscription2.Out())

	subscription3, err := s.Subscribe(ctx, "client-3", query.MustParse("tm.events.type='NewRoundStep' AND abci.account.name='Igor' AND abci.invoice.number = 10"))
	require.NoError(t, err)
	err = s.PublishWithEvents(ctx, "Valeria Richards", map[string][]string{"tm.events.type": {"NewRoundStep"}})
	require.NoError(t, err)
	assert.Zero(t, len(subscription3.Out()))
}

func TestSubscribeDuplicateKeys(t *testing.T) {
	ctx := context.Background()
	s := pubsub.NewServer()
	s.SetLogger(log.TestingLogger())
	require.NoError(t, s.Start())
	defer s.Stop()

	testCases := []struct {
		query    string
		expected interface{}
	}{
		{
			"withdraw.rewards='17'",
			"Iceman",
		},
		{
			"withdraw.rewards='22'",
			"Iceman",
		},
		{
			"withdraw.rewards='1' AND withdraw.rewards='22'",
			"Iceman",
		},
		{
			"withdraw.rewards='100'",
			nil,
		},
	}

	for i, tc := range testCases {
		sub, err := s.Subscribe(ctx, fmt.Sprintf("client-%d", i), query.MustParse(tc.query))
		require.NoError(t, err)

		err = s.PublishWithEvents(
			ctx,
			"Iceman",
			map[string][]string{
				"transfer.sender":  {"foo", "bar", "baz"},
				"withdraw.rewards": {"1", "17", "22"},
			},
		)
		require.NoError(t, err)

		if tc.expected != nil {
			assertReceive(t, tc.expected, sub.Out())
		} else {
			require.Zero(t, len(sub.Out()))
		}
	}
}

func TestClientSubscribesTwice(t *testing.T) {
	s := pubsub.NewServer()
	s.SetLogger(log.TestingLogger())
	s.Start()
	defer s.Stop()

	ctx := context.Background()
	q := query.MustParse("tm.events.type='NewBlock'")

	subscription1, err := s.Subscribe(ctx, clientID, q)
	require.NoError(t, err)
	err = s.PublishWithEvents(ctx, "Goblin Queen", map[string][]string{"tm.events.type": {"NewBlock"}})
	require.NoError(t, err)
	assertReceive(t, "Goblin Queen", subscription1.Out())

	subscription2, err := s.Subscribe(ctx, clientID, q)
	require.Error(t, err)
	require.Nil(t, subscription2)

	err = s.PublishWithEvents(ctx, "Spider-Man", map[string][]string{"tm.events.type": {"NewBlock"}})
	require.NoError(t, err)
	assertReceive(t, "Spider-Man", subscription1.Out())
}

func TestUnsubscribe(t *testing.T) {
	s := pubsub.NewServer()
	s.SetLogger(log.TestingLogger())
	s.Start()
	defer s.Stop()

	ctx := context.Background()
	subscription, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
	require.NoError(t, err)
	err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
	require.NoError(t, err)

	err = s.Publish(ctx, "Nick Fury")
	require.NoError(t, err)
	assert.Zero(t, len(subscription.Out()), "Should not receive anything after Unsubscribe")

	assertCancelled(t, subscription, pubsub.ErrUnsubscribed)
}

func TestClientUnsubscribesTwice(t *testing.T) {
	s := pubsub.NewServer()
	s.SetLogger(log.TestingLogger())
	s.Start()
	defer s.Stop()

	ctx := context.Background()
	_, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
	require.NoError(t, err)
	err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
	require.NoError(t, err)

	err = s.Unsubscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
	assert.Equal(t, pubsub.ErrSubscriptionNotFound, err)
	err = s.UnsubscribeAll(ctx, clientID)
	assert.Equal(t, pubsub.ErrSubscriptionNotFound, err)
}

func TestResubscribe(t *testing.T) {
	s := pubsub.NewServer()
	s.SetLogger(log.TestingLogger())
	s.Start()
	defer s.Stop()

	ctx := context.Background()
	_, err := s.Subscribe(ctx, clientID, query.Empty{})
	require.NoError(t, err)
	err = s.Unsubscribe(ctx, clientID, query.Empty{})
	require.NoError(t, err)
	subscription, err := s.Subscribe(ctx, clientID, query.Empty{})
	require.NoError(t, err)

	err = s.Publish(ctx, "Cable")
	require.NoError(t, err)
	assertReceive(t, "Cable", subscription.Out())
}

func TestUnsubscribeAll(t *testing.T) {
	s := pubsub.NewServer()
	s.SetLogger(log.TestingLogger())
	s.Start()
	defer s.Stop()

	ctx := context.Background()
	subscription1, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlock'"))
	require.NoError(t, err)
	subscription2, err := s.Subscribe(ctx, clientID, query.MustParse("tm.events.type='NewBlockHeader'"))
	require.NoError(t, err)

	err = s.UnsubscribeAll(ctx, clientID)
	require.NoError(t, err)

	err = s.Publish(ctx, "Nick Fury")
	require.NoError(t, err)
	assert.Zero(t, len(subscription1.Out()), "Should not receive anything after UnsubscribeAll")
	assert.Zero(t, len(subscription2.Out()), "Should not receive anything after UnsubscribeAll")

	assertCancelled(t, subscription1, pubsub.ErrUnsubscribed)
	assertCancelled(t, subscription2, pubsub.ErrUnsubscribed)
}

func TestBufferCapacity(t *testing.T) {
	s := pubsub.NewServer(pubsub.BufferCapacity(2))
	s.SetLogger(log.TestingLogger())

	assert.Equal(t, 2, s.BufferCapacity())

	ctx := context.Background()
	err := s.Publish(ctx, "Nighthawk")
	require.NoError(t, err)
	err = s.Publish(ctx, "Sage")
	require.NoError(t, err)

	ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
	defer cancel()
	err = s.Publish(ctx, "Ironclad")
	if assert.Error(t, err) {
		assert.Equal(t, context.DeadlineExceeded, err)
	}
}

func Benchmark10Clients(b *testing.B)   { benchmarkNClients(10, b) }
func Benchmark100Clients(b *testing.B)  { benchmarkNClients(100, b) }
func Benchmark1000Clients(b *testing.B) { benchmarkNClients(1000, b) }

func Benchmark10ClientsOneQuery(b *testing.B)   { benchmarkNClientsOneQuery(10, b) }
func Benchmark100ClientsOneQuery(b *testing.B)  { benchmarkNClientsOneQuery(100, b) }
func Benchmark1000ClientsOneQuery(b *testing.B) { benchmarkNClientsOneQuery(1000, b) }

func benchmarkNClients(n int, b *testing.B) {
	s := pubsub.NewServer()
	s.Start()
	defer s.Stop()

	ctx := context.Background()
	for i := 0; i < n; i++ {
		subscription, err := s.Subscribe(ctx, clientID, query.MustParse(fmt.Sprintf("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = %d", i)))
		if err != nil {
			b.Fatal(err)
		}
		go func() {
			for {
				select {
				case <-subscription.Out():
					continue
				case <-subscription.Cancelled():
					return
				}
			}
		}()
	}

	b.ReportAllocs()
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		s.PublishWithEvents(ctx, "Gamora", map[string][]string{"abci.Account.Owner": {"Ivan"}, "abci.Invoices.Number": {string(i)}})
	}
}

func benchmarkNClientsOneQuery(n int, b *testing.B) {
	s := pubsub.NewServer()
	s.Start()
	defer s.Stop()

	ctx := context.Background()
	q := query.MustParse("abci.Account.Owner = 'Ivan' AND abci.Invoices.Number = 1")
	for i := 0; i < n; i++ {
		subscription, err := s.Subscribe(ctx, clientID, q)
		if err != nil {
			b.Fatal(err)
		}
		go func() {
			for {
				select {
				case <-subscription.Out():
					continue
				case <-subscription.Cancelled():
					return
				}
			}
		}()
	}

	b.ReportAllocs()
	b.ResetTimer()
	for i := 0; i < b.N; i++ {
		s.PublishWithEvents(ctx, "Gamora", map[string][]string{"abci.Account.Owner": {"Ivan"}, "abci.Invoices.Number": {"1"}})
	}
}

///////////////////////////////////////////////////////////////////////////////
/// HELPERS
///////////////////////////////////////////////////////////////////////////////

func assertReceive(t *testing.T, expected interface{}, ch <-chan pubsub.Message, msgAndArgs ...interface{}) {
	select {
	case actual := <-ch:
		assert.Equal(t, expected, actual.Data(), msgAndArgs...)
	case <-time.After(1 * time.Second):
		t.Errorf("Expected to receive %v from the channel, got nothing after 1s", expected)
		debug.PrintStack()
	}
}

func assertCancelled(t *testing.T, subscription *pubsub.Subscription, err error) {
	_, ok := <-subscription.Cancelled()
	assert.False(t, ok)
	assert.Equal(t, err, subscription.Err())
}