package postgresql

import (
	"strings"
	"testing"

	"github.com/stretchr/testify/require"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/plugins/outputs/postgresql/sqltemplate"
	"github.com/influxdata/telegraf/plugins/outputs/postgresql/utils"
)

func TestTableManagerIntegration_EnsureStructure(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	require.NoError(t, p.Connect())

	cols := []utils.Column{
		p.columnFromTag("foo", ""),
		p.columnFromField("baz", 0),
	}
	missingCols, err := p.tableManager.EnsureStructure(
		ctx,
		p.db,
		p.tableManager.table(t.Name()),
		cols,
		p.CreateTemplates,
		p.AddColumnTemplates,
		p.tableManager.table(t.Name()),
		nil,
	)
	require.NoError(t, err)
	require.Empty(t, missingCols)

	tblCols := p.tableManager.table(t.Name()).columns
	require.EqualValues(t, cols[0], tblCols["foo"])
	require.EqualValues(t, cols[1], tblCols["baz"])
}

func TestTableManagerIntegration_EnsureStructure_alter(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	require.NoError(t, p.Connect())

	cols := make([]utils.Column, 0, 3)
	cols = append(cols,
		p.columnFromTag("foo", ""),
		p.columnFromField("bar", 0),
	)
	_, err = p.tableManager.EnsureStructure(
		ctx,
		p.db,
		p.tableManager.table(t.Name()),
		cols,
		p.CreateTemplates,
		p.AddColumnTemplates,
		p.tableManager.table(t.Name()),
		nil,
	)
	require.NoError(t, err)

	cols = append(cols, p.columnFromField("baz", 0))
	missingCols, err := p.tableManager.EnsureStructure(
		ctx,
		p.db,
		p.tableManager.table(t.Name()),
		cols,
		p.CreateTemplates,
		p.AddColumnTemplates,
		p.tableManager.table(t.Name()),
		nil,
	)
	require.NoError(t, err)
	require.Empty(t, missingCols)

	tblCols := p.tableManager.table(t.Name()).columns
	require.EqualValues(t, cols[0], tblCols["foo"])
	require.EqualValues(t, cols[1], tblCols["bar"])
	require.EqualValues(t, cols[2], tblCols["baz"])
}

func TestTableManagerIntegration_EnsureStructure_overflowTableName(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	require.NoError(t, p.Connect())

	tbl := p.tableManager.table("ăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăă") // 32 2-byte unicode characters = 64 bytes
	cols := []utils.Column{
		p.columnFromField("foo", 0),
	}
	_, err = p.tableManager.EnsureStructure(
		ctx,
		p.db,
		tbl,
		cols,
		p.CreateTemplates,
		p.AddColumnTemplates,
		tbl,
		nil,
	)
	require.Error(t, err)
	require.Contains(t, err.Error(), "table name too long")
	require.False(t, isTempError(err))
}

func TestTableManagerIntegration_EnsureStructure_overflowTagName(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	require.NoError(t, p.Connect())

	tbl := p.tableManager.table(t.Name())
	cols := []utils.Column{
		p.columnFromTag("ăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăă", "a"), // 32 2-byte unicode characters = 64 bytes
		p.columnFromField("foo", 0),
	}
	_, err = p.tableManager.EnsureStructure(
		ctx,
		p.db,
		tbl,
		cols,
		p.CreateTemplates,
		p.AddColumnTemplates,
		tbl,
		nil,
	)
	require.Error(t, err)
	require.False(t, isTempError(err))
}

func TestTableManagerIntegration_EnsureStructure_overflowFieldName(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	require.NoError(t, p.Connect())

	tbl := p.tableManager.table(t.Name())
	cols := []utils.Column{
		p.columnFromField("foo", 0),
		p.columnFromField("ăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăăă", 0),
	}
	missingCols, err := p.tableManager.EnsureStructure(
		ctx,
		p.db,
		tbl,
		cols,
		p.CreateTemplates,
		p.AddColumnTemplates,
		tbl,
		nil,
	)
	require.NoError(t, err)
	require.Len(t, missingCols, 1)
	require.Equal(t, cols[1], missingCols[0])
}

func TestTableManagerIntegration_getColumns(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	require.NoError(t, p.Connect())

	cols := []utils.Column{
		p.columnFromTag("foo", ""),
		p.columnFromField("baz", 0),
	}
	_, err = p.tableManager.EnsureStructure(
		ctx,
		p.db,
		p.tableManager.table(t.Name()),
		cols,
		p.CreateTemplates,
		p.AddColumnTemplates,
		p.tableManager.table(t.Name()),
		nil,
	)
	require.NoError(t, err)

	p.tableManager.ClearTableCache()
	require.Empty(t, p.tableManager.table(t.Name()).columns)

	curCols, err := p.tableManager.getColumns(ctx, p.db, t.Name())
	require.NoError(t, err)

	require.EqualValues(t, cols[0], curCols["foo"])
	require.EqualValues(t, cols[1], curCols["baz"])
}

func TestTableManagerIntegration_MatchSource(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	p.TagsAsForeignKeys = true
	require.NoError(t, p.Connect())

	metrics := []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
	}
	tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]

	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
	require.Contains(t, p.tableManager.table(t.Name()+p.TagTableSuffix).columns, "tag")
	require.Contains(t, p.tableManager.table(t.Name()).columns, "a")
}

func TestTableManagerIntegration_MatchSource_UnsignedIntegers(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	p.Uint64Type = PgUint8
	require.NoError(t, p.Init())
	if err := p.Connect(); err != nil {
		if strings.Contains(err.Error(), "retrieving OID for uint8 data type") {
			t.Skipf("pguint extension is not installed")
			t.SkipNow()
		}
		require.NoError(t, err)
	}

	metrics := []telegraf.Metric{
		newMetric(t, "", nil, MSI{"a": uint64(1)}),
	}
	tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]

	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
	require.Equal(t, PgUint8, p.tableManager.table(t.Name()).columns["a"].Type)
}

func TestTableManagerIntegration_noCreateTable(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	p.CreateTemplates = nil
	require.NoError(t, p.Connect())

	metrics := []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
	}
	tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]

	require.Error(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
}

func TestTableManagerIntegration_noCreateTagTable(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	p.TagTableCreateTemplates = nil
	p.TagsAsForeignKeys = true
	require.NoError(t, p.Connect())

	metrics := []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
	}
	tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]

	require.Error(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
}

// verify that TableManager updates & caches the DB table structure unless the incoming metric can't fit.
func TestTableManagerIntegration_cache(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	p.TagsAsForeignKeys = true
	require.NoError(t, p.Connect())

	metrics := []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
	}
	tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]

	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
}

// Verify that when alter statements are disabled and a metric comes in with a new tag key, that the tag is omitted.
func TestTableManagerIntegration_noAlterMissingTag(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	p.AddColumnTemplates = make([]*sqltemplate.Template, 0)
	require.NoError(t, p.Connect())

	metrics := []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
	}
	tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))

	metrics = []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 2}),
		newMetric(t, "", MSS{"tag": "foo", "bar": "baz"}, MSI{"a": 3}),
	}
	tsrc = NewTableSources(p.Postgresql, metrics)[t.Name()]
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
	require.NotContains(t, tsrc.ColumnNames(), "bar")
}

// Verify that when using foreign tags and alter statements are disabled and a metric comes in with a new tag key, that
// the tag is omitted.
func TestTableManagerIntegration_noAlterMissingTagTableTag(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	p.TagsAsForeignKeys = true
	p.TagTableAddColumnTemplates = make([]*sqltemplate.Template, 0)
	require.NoError(t, p.Connect())

	metrics := []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
	}
	tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))

	metrics = []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 2}),
		newMetric(t, "", MSS{"tag": "foo", "bar": "baz"}, MSI{"a": 3}),
	}
	tsrc = NewTableSources(p.Postgresql, metrics)[t.Name()]
	ttsrc := NewTagTableSource(tsrc)
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
	require.NotContains(t, ttsrc.ColumnNames(), "bar")
}

// Verify that when using foreign tags and alter statements generate a permanent error and a metric comes in with a new
// tag key, that the tag is omitted.
func TestTableManagerIntegration_badAlterTagTable(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	p.TagsAsForeignKeys = true
	tmpl := &sqltemplate.Template{}
	require.NoError(t, tmpl.UnmarshalText([]byte("bad")))
	p.TagTableAddColumnTemplates = []*sqltemplate.Template{tmpl}
	require.NoError(t, p.Connect())

	metrics := []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
	}
	tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))

	metrics = []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 2}),
		newMetric(t, "", MSS{"tag": "foo", "bar": "baz"}, MSI{"a": 3}),
	}
	tsrc = NewTableSources(p.Postgresql, metrics)[t.Name()]
	ttsrc := NewTagTableSource(tsrc)
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
	require.NotContains(t, ttsrc.ColumnNames(), "bar")
}

// verify that when alter statements are disabled and a metric comes in with a new field key, that the field is omitted.
func TestTableManagerIntegration_noAlterMissingField(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	p.AddColumnTemplates = make([]*sqltemplate.Template, 0)
	require.NoError(t, p.Connect())

	metrics := []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
	}
	tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))

	metrics = []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 2}),
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 3, "b": 3}),
	}
	tsrc = NewTableSources(p.Postgresql, metrics)[t.Name()]
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
	require.NotContains(t, tsrc.ColumnNames(), "b")
}

// verify that when alter statements generate a permanent error and a metric comes in with a new field key, that the field is omitted.
func TestTableManagerIntegration_badAlterField(t *testing.T) {
	if testing.Short() {
		t.Skip("Skipping integration test in short mode")
	}

	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	tmpl := &sqltemplate.Template{}
	require.NoError(t, tmpl.UnmarshalText([]byte("bad")))
	p.AddColumnTemplates = []*sqltemplate.Template{tmpl}
	require.NoError(t, p.Connect())

	metrics := []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 1}),
	}
	tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))

	metrics = []telegraf.Metric{
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 2}),
		newMetric(t, "", MSS{"tag": "foo"}, MSI{"a": 3, "b": 3}),
	}
	tsrc = NewTableSources(p.Postgresql, metrics)[t.Name()]
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
	require.NotContains(t, tsrc.ColumnNames(), "b")
}

func TestTableManager_addColumnTemplates(t *testing.T) {
	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	p.TagsAsForeignKeys = true
	require.NoError(t, p.Connect())

	metrics := []telegraf.Metric{
		newMetric(t, "", MSS{"foo": "bar"}, MSI{"a": 1}),
	}
	tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))

	p, err = newPostgresqlTest(t)
	require.NoError(t, err)
	p.TagsAsForeignKeys = true
	tmpl := &sqltemplate.Template{}
	require.NoError(t, tmpl.UnmarshalText([]byte(`-- addColumnTemplate: {{ . }}`)))
	p.AddColumnTemplates = []*sqltemplate.Template{tmpl}

	require.NoError(t, p.Connect())

	metrics = []telegraf.Metric{
		newMetric(t, "", MSS{"pop": "tart"}, MSI{"a": 1, "b": 2}),
	}
	tsrc = NewTableSources(p.Postgresql, metrics)[t.Name()]
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
	p.Logger.Info("ok")

	expected := `CREATE TABLE "public"."TestTableManager_addColumnTemplates" ("time" timestamp without time zone, "tag_id" bigint, "a" bigint, "b" bigint)`
	stmtCount := 0
	for _, log := range p.Logger.Logs() {
		if strings.Contains(log.String(), expected) {
			stmtCount++
		}
	}

	require.Equal(t, 1, stmtCount)
}

func TestTableManager_TimeWithTimezone(t *testing.T) {
	p, err := newPostgresqlTest(t)
	require.NoError(t, err)
	p.TagsAsForeignKeys = true
	p.TimestampColumnType = "timestamp with time zone"
	require.NoError(t, p.Init())
	require.NoError(t, p.Connect())

	metrics := []telegraf.Metric{
		newMetric(t, "", MSS{"pop": "tart"}, MSI{"a": 1, "b": 2}),
	}
	tsrc := NewTableSources(p.Postgresql, metrics)[t.Name()]
	require.NoError(t, p.tableManager.MatchSource(ctx, p.db, tsrc))
	p.Logger.Info("ok")

	expected := `CREATE TABLE "public"."TestTableManager_TimeWithTimezone" ("time" timestamp with time zone, "tag_id" bigint, "a" bigint, "b" bigint)`
	stmtCount := 0
	for _, log := range p.Logger.Logs() {
		if strings.Contains(log.String(), expected) {
			stmtCount++
		}
	}

	require.Equal(t, 1, stmtCount)
}
