package agent

import (
	"context"
	"errors"
	"fmt"
	"log"
	"os"
	"runtime"
	"sync"
	"time"

	"github.com/fatih/color"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/config"
	"github.com/influxdata/telegraf/internal"
	"github.com/influxdata/telegraf/internal/clock"
	"github.com/influxdata/telegraf/models"
	"github.com/influxdata/telegraf/plugins/common/snmp"
	"github.com/influxdata/telegraf/plugins/processors"
	"github.com/influxdata/telegraf/plugins/serializers/influx"
)

// Agent runs a set of plugins.
type Agent struct {
	Config *config.Config
}

// NewAgent returns an Agent for the given Config.
func NewAgent(cfg *config.Config) *Agent {
	a := &Agent{
		Config: cfg,
	}
	return a
}

// inputUnit is a group of input plugins and the shared channel they write to.
//
// ┌───────┐
// │ Input │───┐
// └───────┘   │
// ┌───────┐   │     ______
// │ Input │───┼──▶ ()_____)
// └───────┘   │
// ┌───────┐   │
// │ Input │───┘
// └───────┘
type inputUnit struct {
	dst    chan<- telegraf.Metric
	inputs []*models.RunningInput
}

//  ______     ┌───────────┐     ______
// ()_____)──▶ │ Processor │──▶ ()_____)
//             └───────────┘

type processorUnit struct {
	src       <-chan telegraf.Metric
	dst       chan<- telegraf.Metric
	processor *models.RunningProcessor
}

// aggregatorUnit is a group of Aggregators and their source and sink channels.
// Typically, the aggregators write to a processor channel and pass the original
// metrics to the output channel.  The sink channels may be the same channel.

//                 ┌────────────┐
//            ┌──▶ │ Aggregator │───┐
//            │    └────────────┘   │
//  ______    │    ┌────────────┐   │     ______
// ()_____)───┼──▶ │ Aggregator │───┼──▶ ()_____)
//            │    └────────────┘   │
//            │    ┌────────────┐   │
//            ├──▶ │ Aggregator │───┘
//            │    └────────────┘
//            │                           ______
//            └────────────────────────▶ ()_____)

type aggregatorUnit struct {
	src         <-chan telegraf.Metric
	aggC        chan<- telegraf.Metric
	outputC     chan<- telegraf.Metric
	aggregators []*models.RunningAggregator
}

// outputUnit is a group of Outputs and their source channel.  Metrics on the
// channel are written to all outputs.

//                            ┌────────┐
//                       ┌──▶ │ Output │
//                       │    └────────┘
//  ______     ┌─────┐   │    ┌────────┐
// ()_____)──▶ │ Fan │───┼──▶ │ Output │
//             └─────┘   │    └────────┘
//                       │    ┌────────┐
//                       └──▶ │ Output │
//                            └────────┘

type outputUnit struct {
	src     <-chan telegraf.Metric
	outputs []*models.RunningOutput
}

// Run starts and runs the Agent until the context is done.
func (a *Agent) Run(ctx context.Context) error {
	log.Printf("I! [agent] Config: Interval:%s, Quiet:%#v, Hostname:%#v, "+
		"Flush Interval:%s",
		time.Duration(a.Config.Agent.Interval), a.Config.Agent.Quiet,
		a.Config.Agent.Hostname, time.Duration(a.Config.Agent.FlushInterval))

	// Set the default for processor skipping
	if a.Config.Agent.SkipProcessorsAfterAggregators == nil {
		msg := `The default value of 'skip_processors_after_aggregators' will change to 'true' with Telegraf v1.40.0! `
		msg += `If you need the current default behavior, please explicitly set the option to 'false'!`
		log.Print("W! [agent] ", color.YellowString(msg))
		skipProcessorsAfterAggregators := false
		a.Config.Agent.SkipProcessorsAfterAggregators = &skipProcessorsAfterAggregators
	}

	log.Printf("D! [agent] Initializing plugins")
	if err := a.InitPlugins(); err != nil {
		return err
	}

	if a.Config.Persister != nil {
		log.Printf("D! [agent] Initializing plugin states")
		if err := a.initPersister(); err != nil {
			return err
		}
		if err := a.Config.Persister.Load(); err != nil {
			if !errors.Is(err, os.ErrNotExist) {
				return err
			}
			log.Print("I! [agent] State file does not exist... Skip restoring states...")
		}
	}

	startTime := time.Now()

	log.Printf("D! [agent] Connecting outputs")
	next, ou, err := a.startOutputs(ctx, a.Config.Outputs)
	if err != nil {
		return err
	}

	var apu []*processorUnit
	var au *aggregatorUnit
	if len(a.Config.Aggregators) != 0 {
		aggC := next
		if len(a.Config.AggProcessors) != 0 && !*a.Config.Agent.SkipProcessorsAfterAggregators {
			aggC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
			if err != nil {
				return err
			}
		}

		next, au = a.startAggregators(aggC, next, a.Config.Aggregators)
	}

	var pu []*processorUnit
	if len(a.Config.Processors) != 0 {
		next, pu, err = a.startProcessors(next, a.Config.Processors)
		if err != nil {
			return err
		}
	}

	iu, err := a.startInputs(next, a.Config.Inputs)
	if err != nil {
		return err
	}

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		a.runOutputs(ou)
	}()

	if au != nil {
		wg.Add(1)
		go func() {
			defer wg.Done()
			a.runProcessors(apu)
		}()

		wg.Add(1)
		go func() {
			defer wg.Done()
			a.runAggregators(startTime, au)
		}()
	}

	if pu != nil {
		wg.Add(1)
		go func() {
			defer wg.Done()
			a.runProcessors(pu)
		}()
	}

	wg.Add(1)
	go func() {
		defer wg.Done()
		a.runInputs(ctx, startTime, iu)
	}()

	wg.Wait()

	if a.Config.Persister != nil {
		log.Printf("D! [agent] Persisting plugin states")
		if err := a.Config.Persister.Store(); err != nil {
			return err
		}
	}

	log.Printf("D! [agent] Stopped Successfully")
	return err
}

// InitPlugins runs the Init function on plugins.
func (a *Agent) InitPlugins() error {
	for _, input := range a.Config.Inputs {
		// Share the snmp translator setting with plugins that need it.
		if tp, ok := input.Input.(snmp.TranslatorPlugin); ok {
			tp.SetTranslator(a.Config.Agent.SnmpTranslator)
		}
		err := input.Init()
		if err != nil {
			return fmt.Errorf("could not initialize input %s: %w", input.LogName(), err)
		}
	}
	for _, processor := range a.Config.Processors {
		err := processor.Init()
		if err != nil {
			return fmt.Errorf("could not initialize processor %s: %w", processor.LogName(), err)
		}
	}
	for _, aggregator := range a.Config.Aggregators {
		err := aggregator.Init()
		if err != nil {
			return fmt.Errorf("could not initialize aggregator %s: %w", aggregator.LogName(), err)
		}
	}
	if !*a.Config.Agent.SkipProcessorsAfterAggregators {
		for _, processor := range a.Config.AggProcessors {
			err := processor.Init()
			if err != nil {
				return fmt.Errorf("could not initialize processor %s: %w", processor.LogName(), err)
			}
		}
	}
	for _, output := range a.Config.Outputs {
		err := output.Init()
		if err != nil {
			return fmt.Errorf("could not initialize output %s: %w", output.LogName(), err)
		}
	}
	return nil
}

// initPersister initializes the persister and registers the plugins.
func (a *Agent) initPersister() error {
	if err := a.Config.Persister.Init(); err != nil {
		return err
	}

	for _, input := range a.Config.Inputs {
		plugin, ok := input.Input.(telegraf.StatefulPlugin)
		if !ok {
			continue
		}

		name := input.LogName()
		id := input.ID()
		if err := a.Config.Persister.Register(id, plugin); err != nil {
			return fmt.Errorf("could not register input %s: %w", name, err)
		}
	}

	for _, processor := range a.Config.Processors {
		var plugin telegraf.StatefulPlugin
		if p, ok := processor.Processor.(processors.HasUnwrap); ok {
			plugin, ok = p.Unwrap().(telegraf.StatefulPlugin)
			if !ok {
				continue
			}
		} else {
			plugin, ok = processor.Processor.(telegraf.StatefulPlugin)
			if !ok {
				continue
			}
		}

		name := processor.LogName()
		id := processor.ID()
		if err := a.Config.Persister.Register(id, plugin); err != nil {
			return fmt.Errorf("could not register processor %s: %w", name, err)
		}
	}

	for _, aggregator := range a.Config.Aggregators {
		plugin, ok := aggregator.Aggregator.(telegraf.StatefulPlugin)
		if !ok {
			continue
		}

		name := aggregator.LogName()
		id := aggregator.ID()
		if err := a.Config.Persister.Register(id, plugin); err != nil {
			return fmt.Errorf("could not register aggregator %s: %w", name, err)
		}
	}

	for _, processor := range a.Config.AggProcessors {
		plugin, ok := processor.Processor.(telegraf.StatefulPlugin)
		if !ok {
			continue
		}

		name := processor.LogName()
		id := processor.ID()
		if err := a.Config.Persister.Register(id, plugin); err != nil {
			return fmt.Errorf("could not register aggregating processor %s: %w", name, err)
		}
	}

	for _, output := range a.Config.Outputs {
		plugin, ok := output.Output.(telegraf.StatefulPlugin)
		if !ok {
			continue
		}

		name := output.LogName()
		id := output.ID()
		if err := a.Config.Persister.Register(id, plugin); err != nil {
			return fmt.Errorf("could not register output %s: %w", name, err)
		}
	}

	return nil
}

func (*Agent) startInputs(dst chan<- telegraf.Metric, inputs []*models.RunningInput) (*inputUnit, error) {
	log.Printf("D! [agent] Starting service inputs")

	unit := &inputUnit{
		dst: dst,
	}

	for _, input := range inputs {
		// Service input plugins are not normally subject to timestamp
		// rounding except for when precision is set on the input plugin.
		//
		// This only applies to the accumulator passed to Start(), the
		// Gather() accumulator does apply rounding according to the
		// precision and interval agent/plugin settings.
		var interval time.Duration
		var precision time.Duration
		if input.Config.Precision != 0 {
			precision = input.Config.Precision
		}

		acc := NewAccumulator(input, dst)
		acc.SetPrecision(getPrecision(precision, interval))

		if err := input.Start(acc); err != nil {
			// If the model tells us to remove the plugin we do so without error
			var fatalErr *internal.FatalError
			if errors.As(err, &fatalErr) {
				log.Printf("I! [agent] Failed to start %s, shutting down plugin: %s", input.LogName(), err)
				continue
			}

			stopRunningInputs(unit.inputs)

			return nil, fmt.Errorf("starting input %s: %w", input.LogName(), err)
		}
		if err := input.Probe(); err != nil {
			// Probe failures are non-fatal to the agent but should only remove the plugin
			log.Printf("I! [agent] Failed to probe %s, shutting down plugin: %s", input.LogName(), err)
			input.Stop()
			continue
		}
		unit.inputs = append(unit.inputs, input)
	}

	return unit, nil
}

// runInputs starts and triggers the periodic gather for Inputs.
//
// When the context is done the timers are stopped and this function returns
// after all ongoing Gather calls complete.
func (a *Agent) runInputs(ctx context.Context, startTime time.Time, unit *inputUnit) {
	var wg sync.WaitGroup
	var options []clock.Option

	// Initialize time rounding
	if a.Config.Agent.RoundInterval {
		options = append(options, clock.WithAlignment(startTime))
	}

	tickers := make([]*clock.Ticker, 0, len(unit.inputs))
	for _, input := range unit.inputs {
		// Overwrite agent interval if this plugin has its own.
		interval := time.Duration(a.Config.Agent.Interval)
		if input.Config.Interval != 0 {
			interval = input.Config.Interval
		}

		// Overwrite agent precision if this plugin has its own.
		precision := time.Duration(a.Config.Agent.Precision)
		if input.Config.Precision != 0 {
			precision = input.Config.Precision
		}

		// Overwrite agent collection_jitter if this plugin has its own.
		jitter := time.Duration(a.Config.Agent.CollectionJitter)
		if input.Config.CollectionJitterSet {
			jitter = input.Config.CollectionJitter
		}

		// Overwrite agent collection_offset if this plugin has its own.
		offset := time.Duration(a.Config.Agent.CollectionOffset)
		if input.Config.CollectionOffset != 0 {
			offset = input.Config.CollectionOffset
		}

		ticker := clock.NewTicker(interval, jitter, offset, options...)
		tickers = append(tickers, ticker)

		acc := NewAccumulator(input, unit.dst)
		acc.SetPrecision(getPrecision(precision, interval))

		wg.Add(1)
		go func(input *models.RunningInput) {
			defer wg.Done()
			a.gatherLoop(ctx, acc, input, ticker, interval)
		}(input)
	}
	defer stopTickers(tickers)
	wg.Wait()

	log.Printf("D! [agent] Stopping service inputs")
	stopRunningInputs(unit.inputs)

	close(unit.dst)
	log.Printf("D! [agent] Input channel closed")
}

// testStartInputs is a variation of startInputs for use in --test and --once mode.
// It differs by logging Start errors and returning only plugins successfully started.
func (*Agent) testStartInputs(dst chan<- telegraf.Metric, inputs []*models.RunningInput) *inputUnit {
	log.Printf("D! [agent] Starting service inputs")

	unit := &inputUnit{
		dst: dst,
	}

	for _, input := range inputs {
		// Service input plugins are not subject to timestamp rounding.
		// This only applies to the accumulator passed to Start(), the
		// Gather() accumulator does apply rounding according to the
		// precision agent setting.
		acc := NewAccumulator(input, dst)
		acc.SetPrecision(time.Nanosecond)

		if err := input.Start(acc); err != nil {
			log.Printf("E! [agent] Starting input %s: %v", input.LogName(), err)
			continue
		}

		unit.inputs = append(unit.inputs, input)
	}

	return unit
}

// testRunInputs is a variation of runInputs for use in --test and --once mode.
// Instead of using a ticker to run the inputs they are called once immediately.
func (a *Agent) testRunInputs(
	ctx context.Context,
	wait time.Duration,
	unit *inputUnit,
) {
	var wg sync.WaitGroup

	nul := make(chan telegraf.Metric)
	go func() {
		//nolint:revive // empty block needed here
		for range nul {
		}
	}()

	for _, input := range unit.inputs {
		wg.Add(1)
		go func(input *models.RunningInput) {
			defer wg.Done()

			// Overwrite agent interval if this plugin has its own.
			interval := time.Duration(a.Config.Agent.Interval)
			if input.Config.Interval != 0 {
				interval = input.Config.Interval
			}

			// Overwrite agent precision if this plugin has its own.
			precision := time.Duration(a.Config.Agent.Precision)
			if input.Config.Precision != 0 {
				precision = input.Config.Precision
			}

			// Run plugins that require multiple gathers to calculate rate
			// and delta metrics twice.
			switch input.Config.Name {
			case "cpu", "mongodb", "procstat":
				nulAcc := NewAccumulator(input, nul)
				nulAcc.SetPrecision(getPrecision(precision, interval))
				if err := input.Input.Gather(nulAcc); err != nil {
					nulAcc.AddError(err)
				}

				time.Sleep(500 * time.Millisecond)
			}

			acc := NewAccumulator(input, unit.dst)
			acc.SetPrecision(getPrecision(precision, interval))

			if err := input.Input.Gather(acc); err != nil {
				acc.AddError(err)
			}
		}(input)
	}
	wg.Wait()

	if err := internal.SleepContext(ctx, wait); err != nil {
		log.Printf("E! [agent] SleepContext finished with: %v", err)
	}

	log.Printf("D! [agent] Stopping service inputs")
	stopRunningInputs(unit.inputs)

	close(unit.dst)
	log.Printf("D! [agent] Input channel closed")
}

// stopRunningInputs stops all service inputs.
func stopRunningInputs(inputs []*models.RunningInput) {
	for _, input := range inputs {
		input.Stop()
	}
}

// stopRunningOutputs stops all running outputs.
func stopRunningOutputs(outputs []*models.RunningOutput) {
	for _, output := range outputs {
		output.Close()
	}
}

// gather runs an input's gather function periodically until the context is
// done.
func (a *Agent) gatherLoop(
	ctx context.Context,
	acc telegraf.Accumulator,
	input *models.RunningInput,
	ticker *clock.Ticker,
	interval time.Duration,
) {
	for {
		select {
		case <-ticker.C:
			err := a.gatherOnce(acc, input, ticker, interval)
			if err != nil {
				acc.AddError(err)
			}
		case <-ctx.Done():
			return
		}
	}
}

// gatherOnce runs the input's Gather function once, logging a warning each interval it fails to complete before.
func (*Agent) gatherOnce(acc telegraf.Accumulator, input *models.RunningInput, ticker *clock.Ticker, interval time.Duration) error {
	done := make(chan error)
	go func() {
		defer panicRecover(input)
		done <- input.Gather(acc)
	}()

	// Only warn after interval seconds, even if the interval is started late.
	// Intervals can start late if the previous interval went over or due to
	// clock changes.
	slowWarning := time.NewTicker(interval)
	defer slowWarning.Stop()

	for {
		select {
		case err := <-done:
			return err
		case <-slowWarning.C:
			log.Printf("W! [%s] Collection took longer than expected; not complete after interval of %s",
				input.LogName(), interval)
			input.IncrGatherTimeouts()
		case <-ticker.C:
			log.Printf("D! [%s] Previous collection has not completed; scheduled collection skipped",
				input.LogName())
		}
	}
}

// startProcessors sets up the processor chain and calls Start on all processors.  If an error occurs any started processors are Stopped.
func (*Agent) startProcessors(dst chan<- telegraf.Metric, runningProcessors models.RunningProcessors) (chan<- telegraf.Metric, []*processorUnit, error) {
	var src chan telegraf.Metric
	units := make([]*processorUnit, 0, len(runningProcessors))
	// The processor chain is constructed from the output side starting from
	// the output(s) and walking the way back to the input(s). However, the
	// processor-list is sorted by order and/or by appearance in the config,
	// i.e. in input-to-output direction. Therefore, reverse the processor list
	// to reflect the order/definition order in the processing chain.
	for i := len(runningProcessors) - 1; i >= 0; i-- {
		processor := runningProcessors[i]

		src = make(chan telegraf.Metric, 100)
		acc := NewAccumulator(processor, dst)

		err := processor.Start(acc)
		if err != nil {
			for _, u := range units {
				u.processor.Stop()
				close(u.dst)
			}
			return nil, nil, fmt.Errorf("starting processor %s: %w", processor.LogName(), err)
		}

		units = append(units, &processorUnit{
			src:       src,
			dst:       dst,
			processor: processor,
		})

		dst = src
	}

	return src, units, nil
}

// runProcessors begins processing metrics and runs until the source channel is closed and all metrics have been written.
func (*Agent) runProcessors(units []*processorUnit) {
	var wg sync.WaitGroup
	for _, unit := range units {
		wg.Add(1)
		go func(unit *processorUnit) {
			defer wg.Done()

			acc := NewAccumulator(unit.processor, unit.dst)
			for m := range unit.src {
				if err := unit.processor.Add(m, acc); err != nil {
					acc.AddError(err)
					m.Drop()
				}
			}
			unit.processor.Stop()
			close(unit.dst)
			log.Printf("D! [agent] Processor channel closed")
		}(unit)
	}
	wg.Wait()
}

// startAggregators sets up the aggregator unit and returns the source channel.
func (*Agent) startAggregators(aggC, outputC chan<- telegraf.Metric, aggregators []*models.RunningAggregator) (chan<- telegraf.Metric, *aggregatorUnit) {
	src := make(chan telegraf.Metric, 100)
	unit := &aggregatorUnit{
		src:         src,
		aggC:        aggC,
		outputC:     outputC,
		aggregators: aggregators,
	}
	return src, unit
}

// runAggregators beings aggregating metrics and runs until the source channel
// is closed and all metrics have been written.
func (a *Agent) runAggregators(
	startTime time.Time,
	unit *aggregatorUnit,
) {
	ctx, cancel := context.WithCancel(context.Background())

	// Before calling Add, initialize the aggregation window.  This ensures
	// that any metric created after start time will be aggregated.
	for _, agg := range a.Config.Aggregators {
		since, until := updateWindow(startTime, a.Config.Agent.RoundInterval, agg.Period())
		agg.UpdateWindow(since, until)
	}

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for metric := range unit.src {
			var dropOriginal bool
			for _, agg := range a.Config.Aggregators {
				if ok := agg.Add(metric); ok {
					dropOriginal = true
				}
			}

			if !dropOriginal {
				unit.outputC <- metric // keep original.
			} else {
				metric.Drop()
			}
		}
		cancel()
	}()

	for _, agg := range a.Config.Aggregators {
		wg.Add(1)
		go func(agg *models.RunningAggregator) {
			defer wg.Done()

			interval := time.Duration(a.Config.Agent.Interval)
			precision := time.Duration(a.Config.Agent.Precision)

			acc := NewAccumulator(agg, unit.aggC)
			acc.SetPrecision(getPrecision(precision, interval))
			a.push(ctx, agg, acc)
		}(agg)
	}

	wg.Wait()

	// In the case that there are no processors, both aggC and outputC are the
	// same channel.  If there are processors, we close the aggC and the
	// processor chain will close the outputC when it finishes processing.
	close(unit.aggC)
	log.Printf("D! [agent] Aggregator channel closed")
}

func updateWindow(start time.Time, roundInterval bool, period time.Duration) (since, until time.Time) {
	if roundInterval {
		until = internal.AlignTime(start, period)
		if until.Equal(start) {
			until = internal.AlignTime(start.Add(time.Nanosecond), period)
		}
	} else {
		until = start.Add(period)
	}

	since = until.Add(-period)

	return since, until
}

// push runs the push for a single aggregator every period.
func (*Agent) push(ctx context.Context, aggregator *models.RunningAggregator, acc telegraf.Accumulator) {
	for {
		// Ensures that Push will be called for each period, even if it has
		// already elapsed before this function is called.  This is guaranteed
		// because so long as only Push updates the EndPeriod.  This method
		// also avoids drift by not using a ticker.
		until := time.Until(aggregator.EndPeriod())

		select {
		case <-time.After(until):
			aggregator.Push(acc)
		case <-ctx.Done():
			aggregator.Push(acc)
			return
		}
	}
}

// startOutputs calls Connect on all outputs and returns the source channel.
// If an error occurs calling Connect, all started plugins have Close called.
func (a *Agent) startOutputs(
	ctx context.Context,
	outputs []*models.RunningOutput,
) (chan<- telegraf.Metric, *outputUnit, error) {
	src := make(chan telegraf.Metric, 100)
	unit := &outputUnit{src: src}
	for _, output := range outputs {
		if err := a.connectOutput(ctx, output); err != nil {
			var fatalErr *internal.FatalError
			if errors.As(err, &fatalErr) {
				// If the model tells us to remove the plugin we do so without error
				log.Printf("I! [agent] Failed to connect to [%s], error was %q;  shutting down plugin...", output.LogName(), err)
				output.Close()
				continue
			}

			for _, unitOutput := range unit.outputs {
				unitOutput.Close()
			}
			return nil, nil, fmt.Errorf("connecting output %s: %w", output.LogName(), err)
		}

		unit.outputs = append(unit.outputs, output)
	}

	return src, unit, nil
}

// connectOutput connects to all outputs.
func (*Agent) connectOutput(ctx context.Context, output *models.RunningOutput) error {
	log.Printf("D! [agent] Attempting connection to [%s]", output.LogName())
	if err := output.Connect(); err != nil {
		log.Printf("E! [agent] Failed to connect to [%s], retrying in 15s, error was %q", output.LogName(), err)

		if err := internal.SleepContext(ctx, 15*time.Second); err != nil {
			return err
		}

		if err = output.Connect(); err != nil {
			return fmt.Errorf("error connecting to output %q: %w", output.LogName(), err)
		}
	}
	log.Printf("D! [agent] Successfully connected to %s", output.LogName())
	return nil
}

// runOutputs begins processing metrics and returns until the source channel is
// closed and all metrics have been written.  On shutdown metrics will be
// written one last time and dropped if unsuccessful.
func (a *Agent) runOutputs(
	unit *outputUnit,
) {
	var wg sync.WaitGroup

	// Start flush loop
	interval := time.Duration(a.Config.Agent.FlushInterval)
	jitter := time.Duration(a.Config.Agent.FlushJitter)

	ctx, cancel := context.WithCancel(context.Background())

	for _, output := range unit.outputs {
		interval := interval
		// Overwrite agent flush_interval if this plugin has its own.
		if output.Config.FlushInterval != 0 {
			interval = output.Config.FlushInterval
		}

		jitter := jitter
		// Overwrite agent flush_jitter if this plugin has its own.
		if output.Config.FlushJitter != 0 {
			jitter = output.Config.FlushJitter
		}

		wg.Add(1)
		go func(output *models.RunningOutput) {
			defer wg.Done()

			timer := clock.NewTimer(interval, jitter)
			defer timer.Stop()

			a.flushLoop(ctx, output, timer)
		}(output)
	}

	for metric := range unit.src {
		for i, output := range unit.outputs {
			if i == len(unit.outputs)-1 {
				output.AddMetricNoCopy(metric)
			} else {
				output.AddMetric(metric)
			}
		}
	}

	log.Println("I! [agent] Hang on, flushing any cached metrics before shutdown")
	cancel()
	wg.Wait()

	log.Println("I! [agent] Stopping running outputs")
	stopRunningOutputs(unit.outputs)
}

// flushLoop runs an output's flush function periodically until the context is
// done.
func (a *Agent) flushLoop(ctx context.Context, output *models.RunningOutput, timer *clock.Timer) {
	logError := func(err error) {
		if err != nil {
			log.Printf("E! [agent] Error writing to %s: %v", output.LogName(), err)
		}
	}

	// watch for flush requests
	flushRequested := make(chan os.Signal, 1)
	watchForFlushSignal(flushRequested)
	defer stopListeningForFlushSignal(flushRequested)

	for {
		// Favor shutdown over other methods.
		select {
		case <-ctx.Done():
			logError(a.flushOnce(output, timer, output.Write))
			return
		default:
		}

		select {
		case <-ctx.Done():
			logError(a.flushOnce(output, timer, output.Write))
			return
		case <-timer.C:
			logError(a.flushOnce(output, timer, output.Write))
		case <-flushRequested:
			logError(a.flushOnce(output, timer, output.Write))
		case <-output.BatchReady:
			logError(a.flushBatch(output, output.WriteBatch))
		}
	}
}

// flushOnce runs the output's Write function once, logging a warning each interval it fails to complete before the flush interval elapses.
func (*Agent) flushOnce(output *models.RunningOutput, timer *clock.Timer, writeFunc func() error) error {
	done := make(chan error)
	go func() {
		done <- writeFunc()
	}()

	for {
		select {
		case err := <-done:
			output.LogBufferStatus()
			return err
		case <-timer.C:
			log.Printf("W! [agent] [%q] did not complete within its flush interval",
				output.LogName())
			output.LogBufferStatus()
		}
	}
}

// flushBatch runs the output's Write function once Unlike flushOnce the interval elapsing is not considered during these flushes.
func (*Agent) flushBatch(output *models.RunningOutput, writeFunc func() error) error {
	err := writeFunc()
	output.LogBufferStatus()
	return err
}

// Test runs the inputs, processors and aggregators for a single gather and
// writes the metrics to stdout.
func (a *Agent) Test(ctx context.Context, wait time.Duration) error {
	src := make(chan telegraf.Metric, 100)

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		s := &influx.Serializer{SortFields: true, UintSupport: true}
		for metric := range src {
			octets, err := s.Serialize(metric)
			if err == nil {
				fmt.Print("> ", string(octets))
			}
			metric.Reject()
		}
	}()

	err := a.runTest(ctx, wait, src)
	if err != nil {
		return err
	}

	wg.Wait()

	if models.GlobalGatherErrors.Get() != 0 {
		return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get())
	}
	return nil
}

// runTest runs the agent and performs a single gather sending output to the
// outputC. After gathering pauses for the wait duration to allow service
// inputs to run.
func (a *Agent) runTest(ctx context.Context, wait time.Duration, outputC chan<- telegraf.Metric) error {
	// Set the default for processor skipping
	if a.Config.Agent.SkipProcessorsAfterAggregators == nil {
		msg := `The default value of 'skip_processors_after_aggregators' will change to 'true' with Telegraf v1.40.0! `
		msg += `If you need the current default behavior, please explicitly set the option to 'false'!`
		log.Print("W! [agent] ", color.YellowString(msg))
		skipProcessorsAfterAggregators := false
		a.Config.Agent.SkipProcessorsAfterAggregators = &skipProcessorsAfterAggregators
	}

	log.Printf("D! [agent] Initializing plugins")
	if err := a.InitPlugins(); err != nil {
		return err
	}

	startTime := time.Now()

	next := outputC

	var apu []*processorUnit
	var au *aggregatorUnit
	if len(a.Config.Aggregators) != 0 {
		procC := next
		if len(a.Config.AggProcessors) != 0 && !*a.Config.Agent.SkipProcessorsAfterAggregators {
			var err error
			procC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
			if err != nil {
				return err
			}
		}

		next, au = a.startAggregators(procC, next, a.Config.Aggregators)
	}

	var pu []*processorUnit
	if len(a.Config.Processors) != 0 {
		var err error
		next, pu, err = a.startProcessors(next, a.Config.Processors)
		if err != nil {
			return err
		}
	}

	iu := a.testStartInputs(next, a.Config.Inputs)

	var wg sync.WaitGroup
	if au != nil {
		wg.Add(1)
		go func() {
			defer wg.Done()
			a.runProcessors(apu)
		}()

		wg.Add(1)
		go func() {
			defer wg.Done()
			a.runAggregators(startTime, au)
		}()
	}

	if pu != nil {
		wg.Add(1)
		go func() {
			defer wg.Done()
			a.runProcessors(pu)
		}()
	}

	wg.Add(1)
	go func() {
		defer wg.Done()
		a.testRunInputs(ctx, wait, iu)
	}()

	wg.Wait()

	log.Printf("D! [agent] Stopped Successfully")

	return nil
}

// Once runs the full agent for a single gather.
func (a *Agent) Once(ctx context.Context, wait time.Duration) error {
	err := a.runOnce(ctx, wait)
	if err != nil {
		return err
	}

	if models.GlobalGatherErrors.Get() != 0 {
		return fmt.Errorf("input plugins recorded %d errors", models.GlobalGatherErrors.Get())
	}

	unsent := 0
	for _, output := range a.Config.Outputs {
		unsent += output.BufferLength()
	}
	if unsent != 0 {
		return fmt.Errorf("output plugins unable to send %d metrics", unsent)
	}
	return nil
}

// runOnce runs the agent and performs a single gather sending output to the
// outputC. After gathering pauses for the wait duration to allow service
// inputs to run.
func (a *Agent) runOnce(ctx context.Context, wait time.Duration) error {
	// Set the default for processor skipping
	if a.Config.Agent.SkipProcessorsAfterAggregators == nil {
		msg := `The default value of 'skip_processors_after_aggregators' will change to 'true' with Telegraf v1.40.0! `
		msg += `If you need the current default behavior, please explicitly set the option to 'false'!`
		log.Print("W! [agent] ", color.YellowString(msg))
		skipProcessorsAfterAggregators := false
		a.Config.Agent.SkipProcessorsAfterAggregators = &skipProcessorsAfterAggregators
	}

	log.Printf("D! [agent] Initializing plugins")
	if err := a.InitPlugins(); err != nil {
		return err
	}

	startTime := time.Now()

	log.Printf("D! [agent] Connecting outputs")
	next, ou, err := a.startOutputs(ctx, a.Config.Outputs)
	if err != nil {
		return err
	}

	var apu []*processorUnit
	var au *aggregatorUnit
	if len(a.Config.Aggregators) != 0 {
		procC := next
		if len(a.Config.AggProcessors) != 0 && !*a.Config.Agent.SkipProcessorsAfterAggregators {
			procC, apu, err = a.startProcessors(next, a.Config.AggProcessors)
			if err != nil {
				return err
			}
		}

		next, au = a.startAggregators(procC, next, a.Config.Aggregators)
	}

	var pu []*processorUnit
	if len(a.Config.Processors) != 0 {
		next, pu, err = a.startProcessors(next, a.Config.Processors)
		if err != nil {
			return err
		}
	}

	iu := a.testStartInputs(next, a.Config.Inputs)

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		a.runOutputs(ou)
	}()

	if au != nil {
		wg.Add(1)
		go func() {
			defer wg.Done()
			a.runProcessors(apu)
		}()

		wg.Add(1)
		go func() {
			defer wg.Done()
			a.runAggregators(startTime, au)
		}()
	}

	if pu != nil {
		wg.Add(1)
		go func() {
			defer wg.Done()
			a.runProcessors(pu)
		}()
	}

	wg.Add(1)
	go func() {
		defer wg.Done()
		a.testRunInputs(ctx, wait, iu)
	}()

	wg.Wait()

	log.Printf("D! [agent] Stopped Successfully")

	return nil
}

// Returns the rounding precision for metrics.
func getPrecision(precision, interval time.Duration) time.Duration {
	if precision > 0 {
		return precision
	}

	switch {
	case interval >= time.Second:
		return time.Second
	case interval >= time.Millisecond:
		return time.Millisecond
	case interval >= time.Microsecond:
		return time.Microsecond
	default:
		return time.Nanosecond
	}
}

// panicRecover displays an error if an input panics.
func panicRecover(input *models.RunningInput) {
	//nolint:revive // recover is called inside a deferred function
	if err := recover(); err != nil {
		trace := make([]byte, 2048)
		runtime.Stack(trace, true)
		log.Printf("E! FATAL: [%s] panicked: %s, Stack:\n%s",
			input.LogName(), err, trace)
		log.Fatalln("E! PLEASE REPORT THIS PANIC ON GITHUB with " +
			"stack trace, configuration, and OS information: " +
			"https://github.com/influxdata/telegraf/issues/new/choose")
	}
}

func stopTickers(tickers []*clock.Ticker) {
	for _, ticker := range tickers {
		ticker.Stop()
	}
}
