package dedup

import (
	"fmt"
	"sync"
	"testing"
	"time"

	"github.com/stretchr/testify/require"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/config"
	"github.com/influxdata/telegraf/metric"
	"github.com/influxdata/telegraf/testutil"
)

func TestMetrics(t *testing.T) {
	now := time.Now()

	tests := []struct {
		name         string
		input        []telegraf.Metric
		expected     []telegraf.Metric
		cacheContent []telegraf.Metric
	}{
		{
			name: "retain metric",
			input: []telegraf.Metric{
				metric.New("m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now,
				),
			},
			expected: []telegraf.Metric{
				metric.New("m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now,
				),
			},
			cacheContent: []telegraf.Metric{
				metric.New("m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now,
				),
			},
		},
		{
			name: "suppress repeated metric",
			input: []telegraf.Metric{
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-1*time.Second),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now,
				),
			},
			expected: []telegraf.Metric{
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-1*time.Second),
				),
			},
			cacheContent: []telegraf.Metric{
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-1*time.Second),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-1*time.Second),
				),
			},
		},
		{
			name: "pass updated metric",
			input: []telegraf.Metric{
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-1*time.Second),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 2},
					now,
				),
			},
			expected: []telegraf.Metric{
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-1*time.Second),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 2},
					now,
				),
			},
			cacheContent: []telegraf.Metric{
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-1*time.Second),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 2},
					now,
				),
			},
		},
		{
			name: "pass after cache expired",
			input: []telegraf.Metric{
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-1*time.Hour),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now,
				),
			},
			expected: []telegraf.Metric{
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-1*time.Hour),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now,
				),
			},
			cacheContent: []telegraf.Metric{
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-1*time.Hour),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now,
				),
			},
		},
		{
			name: "cache retains metrics",
			input: []telegraf.Metric{
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-3*time.Hour),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-2*time.Hour),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now,
				),
			},
			expected: []telegraf.Metric{
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-3*time.Hour),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-2*time.Hour),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now,
				),
			},
			cacheContent: []telegraf.Metric{
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-3*time.Hour),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now.Add(-2*time.Hour),
				),
				metric.New(
					"m1",
					map[string]string{"tag": "tag_value"},
					map[string]interface{}{"value": 1},
					now,
				),
			},
		},
		{
			name: "same timestamp",
			input: []telegraf.Metric{
				metric.New("metric",
					map[string]string{"tag": "value"},
					map[string]interface{}{"foo": 1}, // field
					now,
				),
				metric.New(
					"metric",
					map[string]string{"tag": "value"},
					map[string]interface{}{"bar": 1}, // different field
					now,
				),
				metric.New(
					"metric",
					map[string]string{"tag": "value"},
					map[string]interface{}{"bar": 2}, // same field different value
					now,
				),
				metric.New(
					"metric",
					map[string]string{"tag": "value"},
					map[string]interface{}{"bar": 2}, // same field same value
					now,
				),
			},
			expected: []telegraf.Metric{
				metric.New(
					"metric",
					map[string]string{"tag": "value"},
					map[string]interface{}{"foo": 1},
					now,
				),
				metric.New(
					"metric",
					map[string]string{"tag": "value"},
					map[string]interface{}{"bar": 1},
					now,
				),
				metric.New(
					"metric",
					map[string]string{"tag": "value"},
					map[string]interface{}{"bar": 2},
					now,
				),
			},
			cacheContent: []telegraf.Metric{
				metric.New("metric",
					map[string]string{"tag": "value"},
					map[string]interface{}{"foo": 1},
					now,
				),
				metric.New(
					"metric",
					map[string]string{"tag": "value"},
					map[string]interface{}{"foo": 1, "bar": 1},
					now,
				),
				metric.New(
					"metric",
					map[string]string{"tag": "value"},
					map[string]interface{}{"bar": 2},
					now,
				),
				metric.New(
					"metric",
					map[string]string{"tag": "value"},
					map[string]interface{}{"bar": 2},
					now,
				),
			},
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			// Create plugin instance
			plugin := &Dedup{
				DedupInterval: config.Duration(10 * time.Minute),
				flushTime:     now.Add(-1 * time.Second),
				cache:         make(map[uint64]telegraf.Metric),
			}

			// Feed the input metrics and record the outputs
			var actual []telegraf.Metric
			for i, m := range tt.input {
				actual = append(actual, plugin.Apply(m)...)

				// Check the cache content
				if cm := tt.cacheContent[i]; cm == nil {
					require.Empty(t, plugin.cache)
				} else {
					id := m.HashID()
					require.NotEmpty(t, plugin.cache)
					require.Contains(t, plugin.cache, id)
					testutil.RequireMetricEqual(t, cm, plugin.cache[id])
				}
			}

			// Check if we got the expected metrics
			testutil.RequireMetricsEqual(t, tt.expected, actual)
		})
	}
}

func TestCacheShrink(t *testing.T) {
	now := time.Now()

	// Time offset is more than 2 * DedupInterval
	plugin := &Dedup{
		DedupInterval: config.Duration(10 * time.Minute),
		flushTime:     now.Add(-2 * time.Hour),
		cache:         make(map[uint64]telegraf.Metric),
	}

	// Time offset is more than 1 * DedupInterval
	input := []telegraf.Metric{
		metric.New(
			"m1",
			map[string]string{"tag": "tag_value"},
			map[string]interface{}{"value": 1},
			now.Add(-1*time.Hour),
		),
	}
	actual := plugin.Apply(input...)
	expected := input
	testutil.RequireMetricsEqual(t, expected, actual)
	require.Empty(t, plugin.cache)
}

func TestTracking(t *testing.T) {
	now := time.Now()

	inputRaw := []telegraf.Metric{
		metric.New("metric",
			map[string]string{"tag": "value"},
			map[string]interface{}{"foo": 1},
			now.Add(-2*time.Second),
		),
		metric.New("metric",
			map[string]string{"tag": "pass"},
			map[string]interface{}{"foo": 1},
			now.Add(-2*time.Second),
		),
		metric.New("metric",
			map[string]string{"tag": "value"},
			map[string]interface{}{"foo": 1},
			now.Add(-1*time.Second),
		),
		metric.New("metric",
			map[string]string{"tag": "pass"},
			map[string]interface{}{"foo": 1},
			now.Add(-1*time.Second),
		),
		metric.New(
			"metric",
			map[string]string{"tag": "value"},
			map[string]interface{}{"foo": 3},
			now,
		),
	}

	var mu sync.Mutex
	delivered := make([]telegraf.DeliveryInfo, 0, len(inputRaw))
	notify := func(di telegraf.DeliveryInfo) {
		mu.Lock()
		defer mu.Unlock()
		delivered = append(delivered, di)
	}

	input := make([]telegraf.Metric, 0, len(inputRaw))
	for _, m := range inputRaw {
		tm, _ := metric.WithTracking(m, notify)
		input = append(input, tm)
	}

	expected := []telegraf.Metric{
		metric.New("metric",
			map[string]string{"tag": "value"},
			map[string]interface{}{"foo": 1},
			now.Add(-2*time.Second),
		),
		metric.New("metric",
			map[string]string{"tag": "pass"},
			map[string]interface{}{"foo": 1},
			now.Add(-2*time.Second),
		),
		metric.New(
			"metric",
			map[string]string{"tag": "value"},
			map[string]interface{}{"foo": 3},
			now,
		),
	}

	// Create plugin instance
	plugin := &Dedup{
		DedupInterval: config.Duration(10 * time.Minute),
		flushTime:     now.Add(-1 * time.Second),
		cache:         make(map[uint64]telegraf.Metric),
	}

	// Process expected metrics and compare with resulting metrics
	actual := plugin.Apply(input...)
	testutil.RequireMetricsEqual(t, expected, actual)

	// Simulate output acknowledging delivery
	for _, m := range actual {
		m.Accept()
	}

	// Check delivery
	require.Eventuallyf(t, func() bool {
		mu.Lock()
		defer mu.Unlock()
		return len(input) == len(delivered)
	}, time.Second, 100*time.Millisecond, "%d delivered but %d expected", len(delivered), len(expected))
}

func TestStatePersistence(t *testing.T) {
	now := time.Now()

	// Define the metrics and states
	state := fmt.Sprintf("metric,tag=value foo=1i %d\n", now.Add(-1*time.Minute).UnixNano())
	input := []telegraf.Metric{
		metric.New("metric",
			map[string]string{"tag": "value"},
			map[string]interface{}{"foo": 1},
			now.Add(-2*time.Second),
		),
		metric.New("metric",
			map[string]string{"tag": "pass"},
			map[string]interface{}{"foo": 1},
			now.Add(-1*time.Second),
		),
		metric.New(
			"metric",
			map[string]string{"tag": "value"},
			map[string]interface{}{"foo": 3},
			now,
		),
	}

	expected := []telegraf.Metric{
		metric.New("metric",
			map[string]string{"tag": "pass"},
			map[string]interface{}{"foo": 1},
			now.Add(-1*time.Second),
		),
		metric.New(
			"metric",
			map[string]string{"tag": "value"},
			map[string]interface{}{"foo": 3},
			now,
		),
	}
	expectedState := []string{
		fmt.Sprintf("metric,tag=pass foo=1i %d\n", now.Add(-1*time.Second).UnixNano()),
		fmt.Sprintf("metric,tag=value foo=3i %d\n", now.UnixNano()),
	}

	// Configure the plugin
	plugin := &Dedup{
		DedupInterval: config.Duration(10 * time.Hour), // use a long interval to avoid flaky tests
		flushTime:     now.Add(-1 * time.Second),
		cache:         make(map[uint64]telegraf.Metric),
	}
	require.Empty(t, plugin.cache)

	// Setup the "persisted" state
	var pi telegraf.StatefulPlugin = plugin
	require.NoError(t, pi.SetState([]byte(state)))
	require.Len(t, plugin.cache, 1)

	// Process expected metrics and compare with resulting metrics
	actual := plugin.Apply(input...)
	testutil.RequireMetricsEqual(t, expected, actual)

	// Check getting the persisted state
	// Because the cache is a map, the order of metrics in the state is not
	// guaranteed, so check the string contents regardless of the order.
	actualState, ok := pi.GetState().([]byte)
	require.True(t, ok, "state is not a bytes array")
	var expectedLen int
	for _, m := range expectedState {
		require.Contains(t, string(actualState), m)
		expectedLen += len(m)
	}
	require.Len(t, actualState, expectedLen)
}
