package influx

import (
	"bytes"
	"errors"
	"io"
	"log"

	"github.com/influxdata/telegraf"
)

// reader is an io.Reader for line protocol.
type reader struct {
	metrics    []telegraf.Metric
	serializer *Serializer
	offset     int
	buf        *bytes.Buffer
}

// NewReader creates a new reader over the given metrics.
func NewReader(metrics []telegraf.Metric, serializer *Serializer) io.Reader {
	return &reader{
		metrics:    metrics,
		serializer: serializer,
		offset:     0,
		buf:        bytes.NewBuffer(make([]byte, 0, serializer.MaxLineBytes)),
	}
}

// SetMetrics changes the metrics to be read.
func (r *reader) SetMetrics(metrics []telegraf.Metric) {
	r.metrics = metrics
	r.offset = 0
	r.buf.Reset()
}

// Read reads up to len(p) bytes of the current metric into p, each call will
// only serialize at most one metric so the number of bytes read may be less
// than p.  Subsequent calls to Read will read the next metric until all are
// emitted.  If a metric cannot be serialized, an error will be returned, you
// may resume with the next metric by calling Read again.  When all metrics
// are emitted the err is io.EOF.
func (r *reader) Read(p []byte) (int, error) {
	if r.buf.Len() > 0 {
		return r.buf.Read(p)
	}

	if r.offset >= len(r.metrics) {
		return 0, io.EOF
	}

	for _, metric := range r.metrics[r.offset:] {
		err := r.serializer.write(r.buf, metric)
		r.offset++
		if err != nil {
			r.buf.Reset()
			var mErr *metricError
			if errors.As(err, &mErr) {
				continue
			}
			// Since we are serializing multiple metrics, don't fail the
			// the entire batch just because of one unserializable metric.
			log.Printf("E! [serializers.influx] could not serialize metric: %v; discarding metric", err)
			continue
		}
		break
	}

	return r.buf.Read(p)
}
