//go:generate ../../../tools/readme_config_includer/generator
package parser

import (
	"bytes"
	_ "embed"
	"encoding/base64"
	gobin "encoding/binary"
	"fmt"
	"slices"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/internal"
	"github.com/influxdata/telegraf/plugins/processors"
)

//go:embed sample.conf
var sampleConfig string

type Parser struct {
	DropOriginal bool            `toml:"drop_original"`
	Merge        string          `toml:"merge"`
	ParseFields  []string        `toml:"parse_fields"`
	Base64Fields []string        `toml:"parse_fields_base64"`
	ParseTags    []string        `toml:"parse_tags"`
	Log          telegraf.Logger `toml:"-"`
	parser       telegraf.Parser
}

func (*Parser) SampleConfig() string {
	return sampleConfig
}

func (p *Parser) Init() error {
	switch p.Merge {
	case "":
		p.Merge = "none"
	case "none", "override", "override-with-timestamp", "parent", "parent-with-timestamp":
	default:
		return fmt.Errorf("unrecognized merge value: %s", p.Merge)
	}

	return nil
}

func (p *Parser) SetParser(parser telegraf.Parser) {
	p.parser = parser
}

func (p *Parser) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
	results := make([]telegraf.Metric, 0, len(metrics))
	for _, metric := range metrics {
		var newMetrics []telegraf.Metric
		if !p.DropOriginal {
			newMetrics = append(newMetrics, metric)
		} else {
			metric.Drop()
		}

		// parse fields
		for _, field := range metric.FieldList() {
			plain := slices.Contains(p.ParseFields, field.Key)
			b64 := slices.Contains(p.Base64Fields, field.Key)

			if !plain && !b64 {
				continue
			}

			if plain && b64 {
				p.Log.Errorf("field %s is listed in both parse fields and base64 fields; skipping", field.Key)
				continue
			}

			value, err := toBytes(field.Value)
			if err != nil {
				p.Log.Errorf("could not convert field %s: %v; skipping", field.Key, err)
				continue
			}

			if b64 {
				decoded := make([]byte, base64.StdEncoding.DecodedLen(len(value)))
				n, err := base64.StdEncoding.Decode(decoded, value)
				if err != nil {
					p.Log.Errorf("could not decode base64 field %s: %v; skipping", field.Key, err)
					continue
				}
				value = decoded[:n]
			}

			fromFieldMetric, err := p.parser.Parse(value)
			if err != nil {
				p.Log.Errorf("could not parse field %s: %v", field.Key, err)
				continue
			}

			for _, m := range fromFieldMetric {
				// The parser get the parent plugin's name as
				// default measurement name. Thus, in case the
				// parsed metric does not provide a name itself,
				// the parser  will return 'parser' as we are in
				// processors.parser. In those cases we want to
				// keep the original metric name.
				if m.Name() == "" || m.Name() == "parser" {
					m.SetName(metric.Name())
				}
			}

			// multiple parsed fields shouldn't create multiple
			// metrics so we'll merge tags/fields down into one
			// prior to returning.
			newMetrics = append(newMetrics, fromFieldMetric...)
		}

		// parse tags
		for _, key := range p.ParseTags {
			if value, ok := metric.GetTag(key); ok {
				fromTagMetric, err := p.parseValue(value)
				if err != nil {
					p.Log.Errorf("could not parse tag %s: %v", key, err)
				}

				for _, m := range fromTagMetric {
					// The parser get the parent plugin's name as
					// default measurement name. Thus, in case the
					// parsed metric does not provide a name itself,
					// the parser  will return 'parser' as we are in
					// processors.parser. In those cases we want to
					// keep the original metric name.
					if m.Name() == "" || m.Name() == "parser" {
						m.SetName(metric.Name())
					}
				}

				newMetrics = append(newMetrics, fromTagMetric...)
			}
		}

		if len(newMetrics) == 0 {
			continue
		}

		switch p.Merge {
		case "override":
			results = append(results, mergeAll(newMetrics[0], newMetrics[1:], false))
		case "override-with-timestamp":
			results = append(results, mergeAll(newMetrics[0], newMetrics[1:], true))
		case "parent":
			results = append(results, mergeIndividual(metric, newMetrics, false)...)
		case "parent-with-timestamp":
			results = append(results, mergeIndividual(metric, newMetrics, true)...)
		default:
			results = append(results, newMetrics...)
		}
	}
	return results
}

func mergeIndividual(base telegraf.Metric, metrics []telegraf.Metric, mergeTime bool) []telegraf.Metric {
	result := make([]telegraf.Metric, 0, len(metrics))
	for _, metric := range metrics {
		out := base.Copy()
		for _, field := range metric.FieldList() {
			out.AddField(field.Key, field.Value)
		}
		for _, tag := range metric.TagList() {
			out.AddTag(tag.Key, tag.Value)
		}
		out.SetName(metric.Name())
		if mergeTime && !metric.Time().IsZero() {
			out.SetTime(metric.Time())
		}
		result = append(result, out)
	}

	return result
}

func mergeAll(base telegraf.Metric, metrics []telegraf.Metric, mergeTime bool) telegraf.Metric {
	for _, metric := range metrics {
		for _, field := range metric.FieldList() {
			base.AddField(field.Key, field.Value)
		}
		for _, tag := range metric.TagList() {
			base.AddTag(tag.Key, tag.Value)
		}
		base.SetName(metric.Name())

		if mergeTime && !metric.Time().IsZero() {
			base.SetTime(metric.Time())
		}
	}
	return base
}

func (p *Parser) parseValue(value string) ([]telegraf.Metric, error) {
	return p.parser.Parse([]byte(value))
}

func toBytes(value interface{}) ([]byte, error) {
	if v, ok := value.(string); ok {
		return []byte(v), nil
	}

	var buf bytes.Buffer
	if err := gobin.Write(&buf, internal.HostEndianness, value); err != nil {
		return nil, err
	}

	return buf.Bytes(), nil
}

func init() {
	processors.Add("parser", func() telegraf.Processor {
		return &Parser{DropOriginal: false}
	})
}
