//go:generate ../../../tools/readme_config_includer/generator
package sumologic

import (
	"bytes"
	"compress/gzip"
	_ "embed"
	"errors"
	"fmt"
	"net/http"
	"time"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/config"
	"github.com/influxdata/telegraf/internal"
	"github.com/influxdata/telegraf/models"
	"github.com/influxdata/telegraf/plugins/outputs"
	"github.com/influxdata/telegraf/plugins/serializers/carbon2"
	"github.com/influxdata/telegraf/plugins/serializers/graphite"
	"github.com/influxdata/telegraf/plugins/serializers/prometheus"
)

//go:embed sample.conf
var sampleConfig string

const (
	defaultClientTimeout      = 5 * time.Second
	defaultMethod             = http.MethodPost
	defaultMaxRequestBodySize = 1000000

	contentTypeHeader     = "Content-Type"
	carbon2ContentType    = "application/vnd.sumologic.carbon2"
	graphiteContentType   = "application/vnd.sumologic.graphite"
	prometheusContentType = "application/vnd.sumologic.prometheus"
)

type header string

const (
	sourceNameHeader     header = `X-Sumo-Name`
	sourceHostHeader     header = `X-Sumo-Host`
	sourceCategoryHeader header = `X-Sumo-Category`
	dimensionsHeader     header = `X-Sumo-Dimensions`
)

type SumoLogic struct {
	URL                string          `toml:"url"`
	Timeout            config.Duration `toml:"timeout"`
	MaxRequestBodySize config.Size     `toml:"max_request_body_size"`

	SourceName     string `toml:"source_name"`
	SourceHost     string `toml:"source_host"`
	SourceCategory string `toml:"source_category"`
	Dimensions     string `toml:"dimensions"`

	Log telegraf.Logger `toml:"-"`

	client     *http.Client
	serializer telegraf.Serializer

	headers map[string]string
}

func (*SumoLogic) SampleConfig() string {
	return sampleConfig
}

func (s *SumoLogic) SetSerializer(serializer telegraf.Serializer) {
	s.serializer = serializer
}

func (s *SumoLogic) createClient() *http.Client {
	return &http.Client{
		Transport: &http.Transport{
			Proxy: http.ProxyFromEnvironment,
		},
		Timeout: time.Duration(s.Timeout),
	}
}

func (s *SumoLogic) Connect() error {
	s.headers = make(map[string]string)

	var serializer telegraf.Serializer
	if unwrapped, ok := s.serializer.(*models.RunningSerializer); ok {
		serializer = unwrapped.Serializer
	} else {
		serializer = s.serializer
	}

	switch serializer.(type) {
	case *carbon2.Serializer:
		s.headers[contentTypeHeader] = carbon2ContentType
	case *graphite.Serializer:
		s.headers[contentTypeHeader] = graphiteContentType
	case *prometheus.Serializer:
		s.headers[contentTypeHeader] = prometheusContentType
	default:
		return fmt.Errorf("unsupported serializer %T", serializer)
	}

	if s.Timeout == 0 {
		s.Timeout = config.Duration(defaultClientTimeout)
	}

	s.client = s.createClient()

	return nil
}

func (*SumoLogic) Close() error {
	return nil
}

func (s *SumoLogic) Write(metrics []telegraf.Metric) error {
	if s.serializer == nil {
		return errors.New("sumologic: serializer unset")
	}
	if len(metrics) == 0 {
		return nil
	}

	reqBody, err := s.serializer.SerializeBatch(metrics)
	if err != nil {
		return err
	}

	if l := len(reqBody); l > int(s.MaxRequestBodySize) {
		chunks, err := s.splitIntoChunks(metrics)
		if err != nil {
			return err
		}

		return s.writeRequestChunks(chunks)
	}

	return s.writeRequestChunk(reqBody)
}

func (s *SumoLogic) writeRequestChunks(chunks [][]byte) error {
	for _, reqChunk := range chunks {
		if err := s.writeRequestChunk(reqChunk); err != nil {
			s.Log.Errorf("Error sending chunk: %v", err)
		}
	}
	return nil
}

func (s *SumoLogic) writeRequestChunk(reqBody []byte) error {
	var (
		err  error
		buff bytes.Buffer
		gz   = gzip.NewWriter(&buff)
	)

	if _, err = gz.Write(reqBody); err != nil {
		return err
	}

	if err := gz.Close(); err != nil {
		return err
	}

	req, err := http.NewRequest(defaultMethod, s.URL, &buff)
	if err != nil {
		return err
	}

	req.Header.Set("Content-Encoding", "gzip")
	req.Header.Set("User-Agent", internal.ProductToken())

	// Set headers coming from the configuration.
	for k, v := range s.headers {
		req.Header.Set(k, v)
	}

	setHeaderIfSetInConfig(req, sourceNameHeader, s.SourceName)
	setHeaderIfSetInConfig(req, sourceHostHeader, s.SourceHost)
	setHeaderIfSetInConfig(req, sourceCategoryHeader, s.SourceCategory)
	setHeaderIfSetInConfig(req, dimensionsHeader, s.Dimensions)

	resp, err := s.client.Do(req)
	if err != nil {
		return fmt.Errorf("sumologic: failed sending request to %q: %w", s.URL, err)
	}
	defer resp.Body.Close()

	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
		return fmt.Errorf("sumologic: when writing to %q received status code: %d", s.URL, resp.StatusCode)
	}

	return nil
}

// splitIntoChunks splits metrics to be sent into chunks so that every request
// is smaller than s.MaxRequestBodySize unless it was configured so small so that
// even a single metric cannot fit.
// In such a situation metrics will be sent one by one with a warning being logged
// for every request sent even though they don't fit in s.MaxRequestBodySize bytes.
func (s *SumoLogic) splitIntoChunks(metrics []telegraf.Metric) ([][]byte, error) {
	var (
		numMetrics = len(metrics)
		chunks     = make([][]byte, 0)
	)

	for i := 0; i < numMetrics; {
		var toAppend []byte
		for i < numMetrics {
			chunkBody, err := s.serializer.Serialize(metrics[i])
			if err != nil {
				return nil, err
			}

			la := len(toAppend)
			if la != 0 {
				// We already have something to append ...
				if la+len(chunkBody) > int(s.MaxRequestBodySize) {
					// ... and it's just the right size, without currently processed chunk.
					break
				}
				// ... we can try appending more.
				i++
				toAppend = append(toAppend, chunkBody...)
				continue
			}

			// la == 0
			i++
			toAppend = chunkBody

			if len(chunkBody) > int(s.MaxRequestBodySize) {
				s.Log.Warnf(
					"max_request_body_size set to %d which is too small even for a single metric (len: %d), sending without split",
					s.MaxRequestBodySize, len(chunkBody),
				)

				// The serialized metric is too big, but we have no choice
				// but to send it.
				// max_request_body_size was set so small that it wouldn't
				// even accommodate a single metric.
				break
			}

			continue
		}

		if toAppend == nil {
			break
		}

		chunks = append(chunks, toAppend)
	}

	return chunks, nil
}

func setHeaderIfSetInConfig(r *http.Request, h header, value string) {
	if value != "" {
		r.Header.Set(string(h), value)
	}
}

func Default() *SumoLogic {
	return &SumoLogic{
		Timeout:            config.Duration(defaultClientTimeout),
		MaxRequestBodySize: defaultMaxRequestBodySize,
		headers:            make(map[string]string),
	}
}

func init() {
	outputs.Add("sumologic", func() telegraf.Output {
		return Default()
	})
}
