package iotdb

import (
	"math"
	"strconv"
	"testing"
	"time"

	"github.com/apache/iotdb-client-go/client"
	"github.com/stretchr/testify/require"
	"github.com/testcontainers/testcontainers-go/wait"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/config"
	"github.com/influxdata/telegraf/metric"
	"github.com/influxdata/telegraf/testutil"
)

// newMetricWithOrderedFields creates new Metric and makes sure fields are in
// order. This is required to define the expected output where the field order
// needs to be defines.
func newMetricWithOrderedFields(
	name string,
	tags []telegraf.Tag,
	fields []telegraf.Field,
	timestamp time.Time,
) telegraf.Metric {
	m := metric.New(name, map[string]string{}, map[string]interface{}{}, timestamp)
	for _, tag := range tags {
		m.AddTag(tag.Key, tag.Value)
	}
	for _, field := range fields {
		m.AddField(field.Key, field.Value)
	}
	return m
}

func TestInitInvalid(t *testing.T) {
	tests := []struct {
		name     string
		plugin   *IoTDB
		expected string
	}{
		{
			name: "empty tag-conversion",
			plugin: func() *IoTDB {
				s := newIoTDB()
				s.TreatTagsAs = ""
				s.Log = &testutil.Logger{}
				return s
			}(),
			expected: `unknown 'convert_tags_to' method ""`,
		},
		{
			name: "empty uint-conversion",
			plugin: func() *IoTDB {
				s := newIoTDB()
				s.ConvertUint64To = ""
				s.Log = &testutil.Logger{}
				return s
			}(),
			expected: `unknown 'uint64_conversion' method ""`,
		},
		{
			name: "empty timestamp precision",
			plugin: func() *IoTDB {
				s := newIoTDB()
				s.TimeStampUnit = ""
				s.Log = &testutil.Logger{}
				return s
			}(),
			expected: `unknown 'timestamp_precision' method ""`,
		},
		{
			name: "invalid tag-conversion",
			plugin: func() *IoTDB {
				s := newIoTDB()
				s.TreatTagsAs = "garbage"
				s.Log = &testutil.Logger{}
				return s
			}(),
			expected: `unknown 'convert_tags_to' method "garbage"`,
		},
		{
			name: "invalid uint-conversion",
			plugin: func() *IoTDB {
				s := newIoTDB()
				s.ConvertUint64To = "garbage"
				s.Log = &testutil.Logger{}
				return s
			}(),
			expected: `unknown 'uint64_conversion' method "garbage"`,
		},
		{
			name: "invalid timestamp precision",
			plugin: func() *IoTDB {
				s := newIoTDB()
				s.TimeStampUnit = "garbage"
				s.Log = &testutil.Logger{}
				return s
			}(),
			expected: `unknown 'timestamp_precision' method "garbage"`,
		},
		{
			name: "negative timeout",
			plugin: func() *IoTDB {
				s := newIoTDB()
				s.Timeout = config.Duration(time.Second * -5)
				s.Log = &testutil.Logger{}
				return s
			}(),
			expected: `negative timeout`,
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			require.EqualError(t, tt.plugin.Init(), tt.expected)
		})
	}
}

// Test Metric conversion, which means testing function `convertMetricsToRecordsWithTags`
func TestMetricConversionToRecordsWithTags(t *testing.T) {
	var testTimestamp = time.Date(2022, time.July, 20, 12, 25, 33, 44, time.UTC)

	tests := []struct {
		name     string
		plugin   *IoTDB
		expected recordsWithTags
		metrics  []telegraf.Metric
	}{
		{
			name:   "default config",
			plugin: func() *IoTDB { s := newIoTDB(); return s }(),
			expected: recordsWithTags{
				DeviceIDList: []string{"root.computer.fan", "root.computer.fan", "root.computer.keyboard"},
				MeasurementsList: [][]string{
					{"temperature", "counter"},
					{"counter", "temperature"},
					{"temperature", "counter", "unsigned_big", "string", "bool", "int_text"},
				},
				ValuesList: [][]interface{}{
					{float64(42.55), int64(987654321)},
					{int64(123456789), float64(56.24)},
					{float64(30.33), int64(123456789), int64(math.MaxInt64), "Made in China.", bool(false), "123456789011"},
				},
				DataTypesList: [][]client.TSDataType{
					{client.DOUBLE, client.INT64},
					{client.INT64, client.DOUBLE},
					{client.DOUBLE, client.INT64, client.INT64, client.TEXT, client.BOOLEAN, client.TEXT},
				},
				TimestampList: []int64{testTimestamp.UnixNano(), testTimestamp.UnixNano(), testTimestamp.UnixNano()},
			},
			metrics: []telegraf.Metric{
				newMetricWithOrderedFields(
					"root.computer.fan",
					[]telegraf.Tag{
						{Key: "price", Value: "expensive"},
						{Key: "owner", Value: "cpu"},
					},
					[]telegraf.Field{
						{Key: "temperature", Value: float64(42.55)},
						{Key: "counter", Value: int64(987654321)},
					},
					testTimestamp,
				),
				newMetricWithOrderedFields(
					"root.computer.fan",
					[]telegraf.Tag{ // same keys in different order
						{Key: "owner", Value: "gpu"},
						{Key: "price", Value: "cheap"},
					},
					[]telegraf.Field{ // same keys in different order
						{Key: "counter", Value: int64(123456789)},
						{Key: "temperature", Value: float64(56.24)},
					},
					testTimestamp,
				),
				newMetricWithOrderedFields(
					"root.computer.keyboard",
					nil,
					[]telegraf.Field{
						{Key: "temperature", Value: float64(30.33)},
						{Key: "counter", Value: int64(123456789)},
						{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
						{Key: "string", Value: "Made in China."},
						{Key: "bool", Value: bool(false)},
						{Key: "int_text", Value: "123456789011"},
					},
					testTimestamp,
				),
			},
		},
		{
			name:   "unsigned int to text",
			plugin: func() *IoTDB { cli002 := newIoTDB(); cli002.ConvertUint64To = "text"; return cli002 }(),
			expected: recordsWithTags{
				DeviceIDList:     []string{"root.computer.uint_to_text"},
				MeasurementsList: [][]string{{"unsigned_big"}},
				ValuesList:       [][]interface{}{{strconv.FormatUint(uint64(math.MaxInt64+1000), 10)}},
				DataTypesList:    [][]client.TSDataType{{client.TEXT}},
				TimestampList:    []int64{testTimestamp.UnixNano()},
			},
			metrics: []telegraf.Metric{
				newMetricWithOrderedFields(
					"root.computer.uint_to_text",
					nil,
					[]telegraf.Field{
						{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
					},
					testTimestamp,
				),
			},
		},
		{
			name:   "unsigned int to int with overflow",
			plugin: func() *IoTDB { cli002 := newIoTDB(); cli002.ConvertUint64To = "int64"; return cli002 }(),
			expected: recordsWithTags{
				DeviceIDList:     []string{"root.computer.overflow"},
				MeasurementsList: [][]string{{"unsigned_big"}},
				ValuesList:       [][]interface{}{{int64(-9223372036854774809)}},
				DataTypesList:    [][]client.TSDataType{{client.INT64}},
				TimestampList:    []int64{testTimestamp.UnixNano()},
			},
			metrics: []telegraf.Metric{
				newMetricWithOrderedFields(
					"root.computer.overflow",
					nil,
					[]telegraf.Field{
						{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
					},
					testTimestamp,
				),
			},
		},
		{
			name:   "second timestamp precision",
			plugin: func() *IoTDB { s := newIoTDB(); s.TimeStampUnit = "second"; return s }(),
			expected: recordsWithTags{
				DeviceIDList:     []string{"root.computer.second"},
				MeasurementsList: [][]string{{"unsigned_big"}},
				ValuesList:       [][]interface{}{{int64(math.MaxInt64)}},
				DataTypesList:    [][]client.TSDataType{{client.INT64}},
				TimestampList:    []int64{testTimestamp.Unix()},
			},
			metrics: []telegraf.Metric{
				newMetricWithOrderedFields(
					"root.computer.second",
					nil,
					[]telegraf.Field{
						{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
					},
					testTimestamp,
				),
			},
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			tt.plugin.Log = &testutil.Logger{}
			require.NoError(t, tt.plugin.Init())
			actual, err := tt.plugin.convertMetricsToRecordsWithTags(tt.metrics)
			require.NoError(t, err)
			// Ignore the tags-list for comparison
			actual.TagsList = nil
			expected := tt.expected
			require.EqualValues(t, &expected, actual)
		})
	}
}

// Test tag sanitize
func TestTagSanitization(t *testing.T) {
	tests := []struct {
		name     string
		plugin   *IoTDB
		expected []string
		input    []string
	}{
		{ // don't sanitize tags containing UnsopportedCharacter on IoTDB V1.3
			name:     "Don't Sanitize Tags",
			plugin:   func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "1.3"; return s }(),
			expected: []string{"word", "`word`", "word_"},
			input:    []string{"word", "`word`", "word_"},
		},
		{ // sanitize tags containing UnsupportedCharacter on IoTDB V1.3 enclosing them in backticks
			name:     "Sanitize Tags",
			plugin:   func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "1.3"; return s }(),
			expected: []string{"`wo rd`", "`@`", "`$`", "`#`", "`:`", "`{`", "`}`", "`1`", "`1234`"},
			input:    []string{"wo rd", "@", "$", "#", ":", "{", "}", "1", "1234"},
		},
		{ // test on forbidden word and forbidden syntax
			name:     "Errors",
			plugin:   func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "1.3"; return s }(),
			expected: []string{"", ""},
			input:    []string{"root", "wo`rd"},
		},
		{
			name:     "Don't Sanitize Tags",
			plugin:   func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "0.13"; return s }(),
			expected: []string{"word", "`word`", "word_", "@", "$", "#", ":", "{", "}"},
			input:    []string{"word", "`word`", "word_", "@", "$", "#", ":", "{", "}"},
		},
		{ // sanitize tags containing UnsupportedCharacter on IoTDB V0.13 enclosing them in backticks
			name:     "Sanitize Tags",
			plugin:   func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "0.13"; return s }(),
			expected: []string{"`wo rd`", "`\\`"},
			input:    []string{"wo rd", "\\"},
		},
		{ // test on forbidden word and forbidden syntax on IoTDB V0.13
			name:     "Errors",
			plugin:   func() *IoTDB { s := newIoTDB(); s.SanitizeTags = "0.13"; return s }(),
			expected: []string{"", ""},
			input:    []string{"root", "wo`rd"},
		},
	}

	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			tt.plugin.Log = &testutil.Logger{}
			require.NoError(t, tt.plugin.Init())

			actuals := make([]string, 0, len(tt.input))
			for _, input := range tt.input {
				//nolint:errcheck // error cases handled by expected vs actual comparison
				actual, _ := tt.plugin.validateTag(input)
				actuals = append(actuals, actual)
			}

			require.EqualValues(t, tt.expected, actuals)
		})
	}
}

// Test tags handling, which means testing function `modifyRecordsWithTags`
func TestTagsHandling(t *testing.T) {
	var testTimestamp = time.Date(2022, time.July, 20, 12, 25, 33, 44, time.UTC)

	tests := []struct {
		name     string
		plugin   *IoTDB
		expected recordsWithTags
		input    recordsWithTags
	}{
		{ // treat tags as fields. And input Tags are NOT in order.
			name:   "treat tags as fields",
			plugin: func() *IoTDB { s := newIoTDB(); s.TreatTagsAs = "fields"; return s }(),
			expected: recordsWithTags{
				DeviceIDList:     []string{"root.computer.fields"},
				MeasurementsList: [][]string{{"temperature", "counter", "owner", "price"}},
				ValuesList: [][]interface{}{
					{float64(42.55), int64(987654321), "cpu", "expensive"},
				},
				DataTypesList: [][]client.TSDataType{
					{client.DOUBLE, client.INT64, client.TEXT, client.TEXT},
				},
				TimestampList: []int64{testTimestamp.UnixNano()},
			},
			input: recordsWithTags{
				DeviceIDList:     []string{"root.computer.fields"},
				MeasurementsList: [][]string{{"temperature", "counter"}},
				ValuesList: [][]interface{}{
					{float64(42.55), int64(987654321)},
				},
				DataTypesList: [][]client.TSDataType{
					{client.DOUBLE, client.INT64},
				},
				TimestampList: []int64{testTimestamp.UnixNano()},
				TagsList: [][]*telegraf.Tag{{
					{Key: "owner", Value: "cpu"},
					{Key: "price", Value: "expensive"},
				}},
			},
		},
		{ // treat tags as device IDs. And input Tags are in order.
			name:   "treat tags as device IDs",
			plugin: func() *IoTDB { s := newIoTDB(); s.TreatTagsAs = "device_id"; return s }(),
			expected: recordsWithTags{
				DeviceIDList:     []string{"root.computer.deviceID.cpu.expensive"},
				MeasurementsList: [][]string{{"temperature", "counter"}},
				ValuesList: [][]interface{}{
					{float64(42.55), int64(987654321)},
				},
				DataTypesList: [][]client.TSDataType{
					{client.DOUBLE, client.INT64},
				},
				TimestampList: []int64{testTimestamp.UnixNano()},
			},
			input: recordsWithTags{
				DeviceIDList:     []string{"root.computer.deviceID"},
				MeasurementsList: [][]string{{"temperature", "counter"}},
				ValuesList: [][]interface{}{
					{float64(42.55), int64(987654321)},
				},
				DataTypesList: [][]client.TSDataType{
					{client.DOUBLE, client.INT64},
				},
				TimestampList: []int64{testTimestamp.UnixNano()},
				TagsList: [][]*telegraf.Tag{{
					{Key: "owner", Value: "cpu"},
					{Key: "price", Value: "expensive"},
				}},
			},
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			input := tt.input
			tt.plugin.Log = &testutil.Logger{}
			require.NoError(t, tt.plugin.Init())
			require.NoError(t, tt.plugin.modifyRecordsWithTags(&input))
			// Ignore the tags-list for comparison
			tt.input.TagsList = nil
			require.EqualValues(t, tt.expected, tt.input)
		})
	}
}

// Test entire Metric conversion, from metrics to records which are ready to insert
func TestEntireMetricConversion(t *testing.T) {
	var testTimestamp = time.Date(2022, time.July, 20, 12, 25, 33, 44, time.UTC)

	tests := []struct {
		name         string
		plugin       *IoTDB
		expected     recordsWithTags
		metrics      []telegraf.Metric
		requireEqual bool
	}{
		{
			name:   "default config",
			plugin: func() *IoTDB { s := newIoTDB(); return s }(),
			expected: recordsWithTags{
				DeviceIDList: []string{"root.computer.screen.high.LED"},
				MeasurementsList: [][]string{
					{"temperature", "counter", "unsigned_big", "string", "bool", "int_text"},
				},
				ValuesList: [][]interface{}{
					{float64(30.33), int64(123456789), int64(math.MaxInt64), "Made in China.", bool(false), "123456789011"},
				},
				DataTypesList: [][]client.TSDataType{
					{client.DOUBLE, client.INT64, client.INT64, client.TEXT, client.BOOLEAN, client.TEXT},
				},
				TimestampList: []int64{testTimestamp.UnixNano()},
			},
			metrics: []telegraf.Metric{
				newMetricWithOrderedFields(
					"root.computer.screen",
					[]telegraf.Tag{
						{Key: "brightness", Value: "high"},
						{Key: "type", Value: "LED"},
					},
					[]telegraf.Field{
						{Key: "temperature", Value: float64(30.33)},
						{Key: "counter", Value: int64(123456789)},
						{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
						{Key: "string", Value: "Made in China."},
						{Key: "bool", Value: bool(false)},
						{Key: "int_text", Value: "123456789011"},
					},
					testTimestamp,
				),
			},
			requireEqual: true,
		},
		{
			name:   "wrong order of tags",
			plugin: func() *IoTDB { s := newIoTDB(); return s }(),
			expected: recordsWithTags{
				DeviceIDList: []string{"root.computer.screen.LED.high"},
				MeasurementsList: [][]string{
					{"temperature", "counter", "unsigned_big", "string", "bool", "int_text"},
				},
				ValuesList: [][]interface{}{
					{float64(30.33), int64(123456789), int64(math.MaxInt64), "Made in China.", bool(false), "123456789011"},
				},
				DataTypesList: [][]client.TSDataType{
					{client.DOUBLE, client.INT64, client.INT64, client.TEXT, client.BOOLEAN, client.TEXT},
				},
				TimestampList: []int64{testTimestamp.UnixNano()},
			},
			metrics: []telegraf.Metric{
				newMetricWithOrderedFields(
					"root.computer.screen",
					[]telegraf.Tag{
						{Key: "brightness", Value: "high"},
						{Key: "type", Value: "LED"},
					},
					[]telegraf.Field{
						{Key: "temperature", Value: float64(30.33)},
						{Key: "counter", Value: int64(123456789)},
						{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
						{Key: "string", Value: "Made in China."},
						{Key: "bool", Value: bool(false)},
						{Key: "int_text", Value: "123456789011"},
					},
					testTimestamp,
				),
			},
			requireEqual: false,
		},
		{
			name:   "wrong order of tags",
			plugin: func() *IoTDB { s := newIoTDB(); return s }(),
			expected: recordsWithTags{
				DeviceIDList: []string{"root.computer.screen.LED.high"},
				MeasurementsList: [][]string{
					{"temperature", "counter"},
				},
				ValuesList: [][]interface{}{
					{float64(30.33), int64(123456789)},
				},
				DataTypesList: [][]client.TSDataType{
					{client.DOUBLE, client.INT64},
				},
				TimestampList: []int64{testTimestamp.UnixNano()},
			},
			metrics: []telegraf.Metric{
				newMetricWithOrderedFields(
					"root.computer.screen",
					[]telegraf.Tag{
						{Key: "brightness", Value: "high"},
						{Key: "type", Value: "LED"},
					},
					[]telegraf.Field{
						{Key: "temperature", Value: float64(30.33)},
						{Key: "counter", Value: int64(123456789)},
					},
					testTimestamp,
				),
			},
			requireEqual: false,
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			tt.plugin.Log = &testutil.Logger{}
			require.NoError(t, tt.plugin.Init())
			actual, err := tt.plugin.convertMetricsToRecordsWithTags(tt.metrics)
			require.NoError(t, err)
			require.NoError(t, tt.plugin.modifyRecordsWithTags(actual))
			// Ignore the tags-list for comparison
			actual.TagsList = nil
			expected := tt.expected
			if tt.requireEqual {
				require.EqualValues(t, &expected, actual)
			} else {
				require.NotEqualValues(t, &expected, actual)
			}
		})
	}
}

// Start a container and do integration test.
func TestIntegrationInserts(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}
	const iotdbPort = "6667"

	container := testutil.Container{
		Image:        "apache/iotdb:0.13.0-node",
		ExposedPorts: []string{iotdbPort},
		WaitingFor: wait.ForAll(
			wait.ForListeningPort(iotdbPort),
			wait.ForLog("IoTDB has started."),
		),
	}
	err := container.Start()
	require.NoError(t, err, "failed to start IoTDB container")
	defer container.Terminate()

	t.Logf("Container Address:%q, ExposedPorts:[%v:%v]", container.Address, container.Ports[iotdbPort], iotdbPort)
	// create a client and tests two groups of insertion
	testClient := &IoTDB{
		Host:            container.Address,
		Port:            container.Ports[iotdbPort],
		User:            config.NewSecret([]byte("root")),
		Password:        config.NewSecret([]byte("root")),
		Timeout:         config.Duration(time.Second * 5),
		ConvertUint64To: "int64_clip",
		TimeStampUnit:   "nanosecond",
		TreatTagsAs:     "device_id",
	}
	testClient.Log = &testutil.Logger{}

	// generate Metrics to input
	metrics := []telegraf.Metric{
		newMetricWithOrderedFields(
			"root.computer.unsigned_big",
			nil,
			[]telegraf.Field{
				{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
			},
			time.Now(),
		),
		newMetricWithOrderedFields(
			"root.computer.fan",
			[]telegraf.Tag{
				{Key: "price", Value: "expensive"},
				{Key: "owner", Value: "cpu"},
			},
			[]telegraf.Field{
				{Key: "temperature", Value: float64(42.55)},
				{Key: "counter", Value: int64(987654321)},
			},
			time.Now(),
		),
		newMetricWithOrderedFields(
			"root.computer.fan",
			[]telegraf.Tag{ // same keys in different order
				{Key: "owner", Value: "gpu"},
				{Key: "price", Value: "cheap"},
			},
			[]telegraf.Field{ // same keys in different order
				{Key: "counter", Value: int64(123456789)},
				{Key: "temperature", Value: float64(56.24)},
			},
			time.Now(),
		),
		newMetricWithOrderedFields(
			"root.computer.keyboard",
			nil,
			[]telegraf.Field{
				{Key: "temperature", Value: float64(30.33)},
				{Key: "counter", Value: int64(123456789)},
				{Key: "unsigned_big", Value: uint64(math.MaxInt64 + 1000)},
				{Key: "string", Value: "Made in China."},
				{Key: "bool", Value: bool(false)},
				{Key: "int_text", Value: "123456789011"},
			},
			time.Now(),
		),
	}

	require.NoError(t, testClient.Connect())
	require.NoError(t, testClient.Write(metrics))
	require.NoError(t, testClient.Close())
}
