//go:generate ../../../tools/readme_config_includer/generator
package cloud_pubsub

import (
	"context"
	_ "embed"
	"encoding/base64"
	"errors"
	"fmt"
	"sync"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"golang.org/x/oauth2/google"
	"google.golang.org/api/option"

	"github.com/influxdata/telegraf"
	"github.com/influxdata/telegraf/config"
	"github.com/influxdata/telegraf/internal"
	common_gcp "github.com/influxdata/telegraf/plugins/common/gcp"
	"github.com/influxdata/telegraf/plugins/inputs"
)

//go:embed sample.conf
var sampleConfig string

var once sync.Once

const (
	defaultMaxUndeliveredMessages = 1000
	defaultRetryDelaySeconds      = 5
)

type PubSub struct {
	sync.Mutex

	CredentialsFile string `toml:"credentials_file"`
	Project         string `toml:"project"`
	Subscription    string `toml:"subscription"`

	// Subscription ReceiveSettings
	MaxExtension           config.Duration `toml:"max_extension"`
	MaxOutstandingMessages int             `toml:"max_outstanding_messages"`
	MaxOutstandingBytes    int             `toml:"max_outstanding_bytes"`
	MaxReceiverGoRoutines  int             `toml:"max_receiver_go_routines"`

	// Agent settings
	MaxMessageLen            int `toml:"max_message_len"`
	MaxUndeliveredMessages   int `toml:"max_undelivered_messages"`
	RetryReceiveDelaySeconds int `toml:"retry_delay_seconds"`

	Base64Data bool `toml:"base64_data"`

	ContentEncoding      string          `toml:"content_encoding"`
	MaxDecompressionSize config.Size     `toml:"max_decompression_size"`
	Log                  telegraf.Logger `toml:"-"`

	sub     subscription
	stubSub func() subscription

	cancel context.CancelFunc

	parser telegraf.Parser
	wg     *sync.WaitGroup
	acc    telegraf.TrackingAccumulator

	undelivered  map[telegraf.TrackingID]message
	sem          semaphore
	decoder      internal.ContentDecoder
	decoderMutex sync.Mutex
}

type (
	empty     struct{}
	semaphore chan empty
)

func (*PubSub) SampleConfig() string {
	return sampleConfig
}

func (ps *PubSub) Init() error {
	if ps.Subscription == "" {
		return errors.New(`"subscription" is required`)
	}

	if ps.Project == "" {
		return errors.New(`"project" is required`)
	}

	switch ps.ContentEncoding {
	case "", "identity":
		ps.ContentEncoding = "identity"
	case "gzip":
		var err error
		var options []internal.DecodingOption
		if ps.MaxDecompressionSize > 0 {
			options = append(options, internal.WithMaxDecompressionSize(int64(ps.MaxDecompressionSize)))
		}
		ps.decoder, err = internal.NewContentDecoder(ps.ContentEncoding, options...)
		if err != nil {
			return err
		}
	default:
		return fmt.Errorf("invalid value %q for content_encoding", ps.ContentEncoding)
	}

	return nil
}

func (ps *PubSub) SetParser(parser telegraf.Parser) {
	ps.parser = parser
}

// Start initializes the plugin and processing messages from Google PubSub.
// Two goroutines are started - one pulling for the subscription, one
// receiving delivery notifications from the accumulator.
func (ps *PubSub) Start(ac telegraf.Accumulator) error {
	ps.sem = make(semaphore, ps.MaxUndeliveredMessages)
	ps.acc = ac.WithTracking(ps.MaxUndeliveredMessages)

	// Create top-level context with cancel that will be called on Stop().
	ctx, cancel := context.WithCancel(context.Background())
	ps.cancel = cancel

	if ps.stubSub != nil {
		ps.sub = ps.stubSub()
	} else {
		subRef, err := ps.getGCPSubscription(ps.Subscription)
		if err != nil {
			return fmt.Errorf("unable to create subscription handle: %w", err)
		}
		ps.sub = subRef
	}

	ps.wg = &sync.WaitGroup{}
	// Start goroutine to handle delivery notifications from accumulator.
	ps.wg.Add(1)
	go func() {
		defer ps.wg.Done()
		ps.waitForDelivery(ctx)
	}()

	// Start goroutine for subscription receiver.
	ps.wg.Add(1)
	go func() {
		defer ps.wg.Done()
		ps.receiveWithRetry(ctx)
	}()

	return nil
}

// Gather does nothing for this service input.
func (*PubSub) Gather(telegraf.Accumulator) error {
	return nil
}

// Stop ensures the PubSub subscriptions receivers are stopped by
// canceling the context and waits for goroutines to finish.
func (ps *PubSub) Stop() {
	ps.cancel()
	ps.wg.Wait()
}

// startReceiver is called within a goroutine and manages keeping a
// subscription.Receive() up and running while the plugin has not been stopped.
func (ps *PubSub) receiveWithRetry(parentCtx context.Context) {
	err := ps.startReceiver(parentCtx)

	for err != nil && parentCtx.Err() == nil {
		ps.Log.Errorf("Receiver for subscription %s exited with error: %v", ps.sub.ID(), err)

		delay := defaultRetryDelaySeconds
		if ps.RetryReceiveDelaySeconds > 0 {
			delay = ps.RetryReceiveDelaySeconds
		}

		ps.Log.Infof("Waiting %d seconds before attempting to restart receiver...", delay)
		time.Sleep(time.Duration(delay) * time.Second)

		err = ps.startReceiver(parentCtx)
	}
}

func (ps *PubSub) startReceiver(parentCtx context.Context) error {
	ps.Log.Infof("Starting receiver for subscription %s...", ps.sub.ID())
	cctx, ccancel := context.WithCancel(parentCtx)
	err := ps.sub.Receive(cctx, func(ctx context.Context, msg message) {
		if err := ps.onMessage(ctx, msg); err != nil {
			ps.acc.AddError(fmt.Errorf("unable to add message from subscription %s: %w", ps.sub.ID(), err))
		}
	})
	if err != nil {
		ps.acc.AddError(fmt.Errorf("receiver for subscription %s exited: %w", ps.sub.ID(), err))
	} else {
		ps.Log.Info("Subscription pull ended (no error, most likely stopped)")
	}
	ccancel()
	return err
}

// onMessage handles parsing and adding a received message to the accumulator.
func (ps *PubSub) onMessage(ctx context.Context, msg message) error {
	if ps.MaxMessageLen > 0 && len(msg.Data()) > ps.MaxMessageLen {
		msg.Ack()
		return fmt.Errorf("message longer than max_message_len (%d > %d)", len(msg.Data()), ps.MaxMessageLen)
	}

	data, err := ps.decompressData(msg.Data())
	if err != nil {
		return fmt.Errorf("unable to decompress %s message: %w", ps.ContentEncoding, err)
	}

	data, err = ps.decodeB64Data(data)
	if err != nil {
		return fmt.Errorf("unable to decode base64 message: %w", err)
	}

	metrics, err := ps.parser.Parse(data)
	if err != nil {
		msg.Ack()
		return fmt.Errorf("unable to parse message: %w", err)
	}

	if len(metrics) == 0 {
		msg.Ack()

		once.Do(func() {
			ps.Log.Debug(internal.NoMetricsCreatedMsg)
		})

		return nil
	}

	select {
	case <-ctx.Done():
		return ctx.Err()
	case ps.sem <- empty{}:
		break
	}

	ps.Lock()
	defer ps.Unlock()

	id := ps.acc.AddTrackingMetricGroup(metrics)
	if ps.undelivered == nil {
		ps.undelivered = make(map[telegraf.TrackingID]message)
	}
	ps.undelivered[id] = msg

	return nil
}

func (ps *PubSub) decompressData(data []byte) ([]byte, error) {
	if ps.ContentEncoding == "identity" {
		return data, nil
	}

	ps.decoderMutex.Lock()
	defer ps.decoderMutex.Unlock()
	data, err := ps.decoder.Decode(data)
	if err != nil {
		return nil, err
	}

	decompressedData := make([]byte, len(data))
	copy(decompressedData, data)
	data = decompressedData

	return data, nil
}

func (ps *PubSub) decodeB64Data(data []byte) ([]byte, error) {
	if ps.Base64Data {
		return base64.StdEncoding.DecodeString(string(data))
	}

	return data, nil
}

func (ps *PubSub) waitForDelivery(parentCtx context.Context) {
	for {
		select {
		case <-parentCtx.Done():
			return
		case info := <-ps.acc.Delivered():
			<-ps.sem
			msg := ps.removeDelivered(info.ID())

			if msg != nil {
				msg.Ack()
			}
		}
	}
}

func (ps *PubSub) removeDelivered(id telegraf.TrackingID) message {
	ps.Lock()
	defer ps.Unlock()

	msg, ok := ps.undelivered[id]
	if !ok {
		return nil
	}
	delete(ps.undelivered, id)
	return msg
}

func (ps *PubSub) getPubSubClient() (*pubsub.Client, error) {
	var credsOpt option.ClientOption
	if ps.CredentialsFile != "" {
		credType, err := common_gcp.ParseCredentialType(ps.CredentialsFile)
		if err != nil {
			return nil, fmt.Errorf("unable to parse credential file type: %w", err)
		}
		credsOpt = option.WithAuthCredentialsFile(option.CredentialsType(credType), ps.CredentialsFile)
	} else {
		creds, err := google.FindDefaultCredentials(context.Background(), pubsub.ScopeCloudPlatform)
		if err != nil {
			return nil, fmt.Errorf(
				"unable to find GCP Application Default Credentials: %v."+
					"Either set ADC or provide CredentialsFile config", err)
		}
		credsOpt = option.WithCredentials(creds)
	}
	client, err := pubsub.NewClient(
		context.Background(),
		ps.Project,
		credsOpt,
		option.WithScopes(pubsub.ScopeCloudPlatform),
		option.WithUserAgent(internal.ProductToken()),
	)
	if err != nil {
		return nil, fmt.Errorf("unable to generate PubSub client: %w", err)
	}
	return client, nil
}

func (ps *PubSub) getGCPSubscription(subID string) (subscription, error) {
	client, err := ps.getPubSubClient()
	if err != nil {
		return nil, err
	}
	s := client.Subscriber(subID)
	s.ReceiveSettings = pubsub.ReceiveSettings{
		NumGoroutines:          ps.MaxReceiverGoRoutines,
		MaxExtension:           time.Duration(ps.MaxExtension),
		MaxOutstandingMessages: ps.MaxOutstandingMessages,
		MaxOutstandingBytes:    ps.MaxOutstandingBytes,
	}
	return &gcpSubscription{s}, nil
}

func init() {
	inputs.Add("cloud_pubsub", func() telegraf.Input {
		ps := &PubSub{
			MaxUndeliveredMessages: defaultMaxUndeliveredMessages,
		}
		return ps
	})
}
