//go:generate ../../../tools/readme_config_includer/generator
package iotdb

import (
	_ "embed"
	"errors"
	"fmt"
	"math"
	"regexp"
	"strconv"
	"strings"
	"time"

	"github.com/apache/iotdb-client-go/client"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/config"
	"github.com/influxdata/telegraf/internal/choice"
	"github.com/influxdata/telegraf/plugins/outputs"
)

//go:embed sample.conf
var sampleConfig string

// matches any word that has a non valid backtick
// `word`  							 <- doesn't match
// “word , `wo`rd` , `word , word`   <- match
var forbiddenBacktick = regexp.MustCompile("^[^\x60].*?[\x60]+.*?[^\x60]$|^[\x60].*[\x60]+.*[\x60]$|^[\x60]+.*[^\x60]$|^[^\x60].*[\x60]+$")
var allowedBacktick = regexp.MustCompile("^[\x60].*[\x60]$")

type IoTDB struct {
	Host            string          `toml:"host"`
	Port            string          `toml:"port"`
	User            config.Secret   `toml:"user"`
	Password        config.Secret   `toml:"password"`
	Timeout         config.Duration `toml:"timeout"`
	ConvertUint64To string          `toml:"uint64_conversion"`
	TimeStampUnit   string          `toml:"timestamp_precision"`
	TreatTagsAs     string          `toml:"convert_tags_to"`
	SanitizeTags    string          `toml:"sanitize_tag"`
	Log             telegraf.Logger `toml:"-"`

	sanityRegex []*regexp.Regexp
	session     *client.Session
}

type recordsWithTags struct {
	// IoTDB Records basic data struct
	DeviceIDList     []string
	MeasurementsList [][]string
	ValuesList       [][]interface{}
	DataTypesList    [][]client.TSDataType
	TimestampList    []int64
	// extra tags
	TagsList [][]*telegraf.Tag
}

func (*IoTDB) SampleConfig() string {
	return sampleConfig
}

// Init is for setup, and validating config.
func (s *IoTDB) Init() error {
	if s.Timeout < 0 {
		return errors.New("negative timeout")
	}
	if !choice.Contains(s.ConvertUint64To, []string{"int64", "int64_clip", "text"}) {
		return fmt.Errorf("unknown 'uint64_conversion' method %q", s.ConvertUint64To)
	}
	if !choice.Contains(s.TimeStampUnit, []string{"second", "millisecond", "microsecond", "nanosecond"}) {
		return fmt.Errorf("unknown 'timestamp_precision' method %q", s.TimeStampUnit)
	}
	if !choice.Contains(s.TreatTagsAs, []string{"fields", "device_id"}) {
		return fmt.Errorf("unknown 'convert_tags_to' method %q", s.TreatTagsAs)
	}

	if s.User.Empty() {
		s.User.Destroy()
		s.User = config.NewSecret([]byte("root"))
	}
	if s.Password.Empty() {
		s.Password.Destroy()
		s.Password = config.NewSecret([]byte("root"))
	}

	switch s.SanitizeTags {
	case "0.13":
		matchUnsupportedCharacter := regexp.MustCompile("[^0-9a-zA-Z_:@#${}\x60]")

		regex := []*regexp.Regexp{matchUnsupportedCharacter}
		s.sanityRegex = append(s.sanityRegex, regex...)

	// from version 1.x.x IoTDB changed the allowed keys in nodes
	case "1.0", "1.1", "1.2", "1.3":
		matchUnsupportedCharacter := regexp.MustCompile("[^0-9a-zA-Z_\x60]")
		matchNumericString := regexp.MustCompile(`^\d+$`)

		regex := []*regexp.Regexp{matchUnsupportedCharacter, matchNumericString}
		s.sanityRegex = append(s.sanityRegex, regex...)
	}

	s.Log.Info("Initialization completed.")
	return nil
}

func (s *IoTDB) Connect() error {
	username, err := s.User.Get()
	if err != nil {
		return fmt.Errorf("getting username failed: %w", err)
	}
	password, err := s.Password.Get()
	if err != nil {
		username.Destroy()
		return fmt.Errorf("getting password failed: %w", err)
	}
	defer password.Destroy()
	sessionConf := &client.Config{
		Host:     s.Host,
		Port:     s.Port,
		UserName: username.String(),
		Password: password.String(),
	}
	username.Destroy()
	password.Destroy()

	var ss = client.NewSession(sessionConf)
	s.session = &ss
	timeoutInMs := int(time.Duration(s.Timeout).Milliseconds())
	if err := s.session.Open(false, timeoutInMs); err != nil {
		return fmt.Errorf("connecting to %s:%s failed: %w", s.Host, s.Port, err)
	}
	return nil
}

func (s *IoTDB) Close() error {
	return s.session.Close()
}

// Write should write immediately to the output, and not buffer writes
// (Telegraf manages the buffer for you). Returning an error will fail this
// batch of writes and the entire batch will be retried automatically.
func (s *IoTDB) Write(metrics []telegraf.Metric) error {
	// Convert Metrics to Records with Tags
	rwt, err := s.convertMetricsToRecordsWithTags(metrics)
	if err != nil {
		return err
	}
	// Write to client.
	// If first writing fails, the client will automatically retry three times. If all fail, it returns an error.
	if err := s.writeRecordsWithTags(rwt); err != nil {
		return fmt.Errorf("write failed: %w", err)
	}
	return nil
}

// Find out data type of the value and return it's id in TSDataType, and convert it if necessary.
func (s *IoTDB) getDataTypeAndValue(value interface{}) (client.TSDataType, interface{}) {
	switch v := value.(type) {
	case int32:
		return client.INT32, v
	case int64:
		return client.INT64, v
	case uint32:
		return client.INT64, int64(v)
	case uint64:
		switch s.ConvertUint64To {
		case "int64_clip":
			if v <= uint64(math.MaxInt64) {
				return client.INT64, int64(v)
			}
			return client.INT64, int64(math.MaxInt64)
		case "int64":
			return client.INT64, int64(v)
		case "text":
			return client.TEXT, strconv.FormatUint(v, 10)
		default:
			return client.UNKNOWN, int64(0)
		}
	case float64:
		return client.DOUBLE, v
	case string:
		return client.TEXT, v
	case bool:
		return client.BOOLEAN, v
	default:
		return client.UNKNOWN, int64(0)
	}
}

// convert Timestamp Unit according to config
func (s *IoTDB) convertTimestampOfMetric(m telegraf.Metric) (int64, error) {
	switch s.TimeStampUnit {
	case "second":
		return m.Time().Unix(), nil
	case "millisecond":
		return m.Time().UnixMilli(), nil
	case "microsecond":
		return m.Time().UnixMicro(), nil
	case "nanosecond":
		return m.Time().UnixNano(), nil
	default:
		return 0, fmt.Errorf("unknown timestamp_precision %q", s.TimeStampUnit)
	}
}

// convert Metrics to Records with tags
func (s *IoTDB) convertMetricsToRecordsWithTags(metrics []telegraf.Metric) (*recordsWithTags, error) {
	timestampList := make([]int64, 0, len(metrics))
	deviceidList := make([]string, 0, len(metrics))
	measurementsList := make([][]string, 0, len(metrics))
	valuesList := make([][]interface{}, 0, len(metrics))
	dataTypesList := make([][]client.TSDataType, 0, len(metrics))
	tagsList := make([][]*telegraf.Tag, 0, len(metrics))

	for _, metric := range metrics {
		// write `metric` to the output sink here
		// deal with basic parameter
		keys := make([]string, 0, len(metric.FieldList()))
		values := make([]interface{}, 0, len(metric.FieldList()))
		dataTypes := make([]client.TSDataType, 0, len(metric.FieldList()))
		for _, field := range metric.FieldList() {
			datatype, value := s.getDataTypeAndValue(field.Value)
			if datatype == client.UNKNOWN {
				return nil, fmt.Errorf("datatype of %q is unknown, values: %v", field.Key, field.Value)
			}
			keys = append(keys, field.Key)
			values = append(values, value)
			dataTypes = append(dataTypes, datatype)
		}
		// Convert timestamp into specified unit
		ts, err := s.convertTimestampOfMetric(metric)
		if err != nil {
			return nil, err
		}
		timestampList = append(timestampList, ts)
		// append all metric data of this record to lists
		deviceidList = append(deviceidList, metric.Name())
		measurementsList = append(measurementsList, keys)
		valuesList = append(valuesList, values)
		dataTypesList = append(dataTypesList, dataTypes)
		tagsList = append(tagsList, metric.TagList())
	}
	rwt := &recordsWithTags{
		DeviceIDList:     deviceidList,
		MeasurementsList: measurementsList,
		ValuesList:       valuesList,
		DataTypesList:    dataTypesList,
		TimestampList:    timestampList,
		TagsList:         tagsList,
	}
	return rwt, nil
}

// checks is the tag contains any IoTDB invalid character
func (s *IoTDB) validateTag(tag string) (string, error) {
	// IoTDB uses "root" as a keyword and can be called only at the start of the path
	if tag == "root" {
		return "", errors.New("cannot use 'root' as tag")
	} else if forbiddenBacktick.MatchString(tag) { // returns an error if the backsticks are used in an inappropriate way
		return "", errors.New("cannot use ` in tag names")
	} else if allowedBacktick.MatchString(tag) { // if the tag in already enclosed in tags returns the tag
		return tag, nil
	}

	// loops through all the regex patterns and if one
	// pattern matches returns the tag between `
	for _, regex := range s.sanityRegex {
		if regex.MatchString(tag) {
			return "`" + tag + "`", nil
		}
	}

	return tag, nil
}

// modify recordsWithTags according to 'TreatTagsAs' Configuration
func (s *IoTDB) modifyRecordsWithTags(rwt *recordsWithTags) error {
	switch s.TreatTagsAs {
	case "fields":
		// method 1: treat Tag(Key:Value) as measurement
		for index, tags := range rwt.TagsList { // for each record
			for _, tag := range tags { // for each tag of this record, append it's Key:Value to measurements
				datatype, value := s.getDataTypeAndValue(tag.Value)
				if datatype == client.UNKNOWN {
					return fmt.Errorf("datatype of %q is unknown, values: %v", tag.Key, value)
				}
				rwt.MeasurementsList[index] = append(rwt.MeasurementsList[index], tag.Key)
				rwt.ValuesList[index] = append(rwt.ValuesList[index], value)
				rwt.DataTypesList[index] = append(rwt.DataTypesList[index], datatype)
			}
		}
		return nil
	case "device_id":
		// method 2: treat Tag(Key:Value) as subtree of device id
		for index, tags := range rwt.TagsList { // for each record
			topic := make([]string, 0, len(tags)+1)
			topic = append(topic, rwt.DeviceIDList[index])
			for _, tag := range tags { // for each tag, append it's Value
				tagValue, err := s.validateTag(tag.Value) // validates tag
				if err != nil {
					return err
				}
				topic = append(topic, tagValue)
			}
			rwt.DeviceIDList[index] = strings.Join(topic, ".")
		}
		return nil
	default:
		// something go wrong. This configuration should have been checked in func Init().
		return fmt.Errorf("unknown 'convert_tags_to' method: %q", s.TreatTagsAs)
	}
}

// Write records with tags to IoTDB server
func (s *IoTDB) writeRecordsWithTags(rwt *recordsWithTags) error {
	// deal with tags
	if err := s.modifyRecordsWithTags(rwt); err != nil {
		return err
	}
	// write to IoTDB server
	return s.session.InsertRecords(
		rwt.DeviceIDList,
		rwt.MeasurementsList,
		rwt.DataTypesList,
		rwt.ValuesList,
		rwt.TimestampList,
	)
}

func init() {
	outputs.Add("iotdb", func() telegraf.Output { return newIoTDB() })
}

// create a new IoTDB struct with default values.
func newIoTDB() *IoTDB {
	return &IoTDB{
		Host:            "localhost",
		Port:            "6667",
		Timeout:         config.Duration(time.Second * 5),
		ConvertUint64To: "int64_clip",
		TimeStampUnit:   "nanosecond",
		TreatTagsAs:     "device_id",
	}
}
