package beat

import (
	"fmt"
	"net"
	"net/http"
	"net/http/httptest"
	"net/url"
	"os"
	"testing"

	"github.com/stretchr/testify/require"

	"github.com/influxdata/telegraf/testutil"
)

func Test_BeatStats(t *testing.T) {
	var beat6StatsAccumulator testutil.Accumulator
	var beatTest = newBeat()
	// System stats are disabled by default
	beatTest.Includes = []string{"beat", "libbeat", "system", "filebeat"}
	require.NoError(t, beatTest.Init())
	fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, request *http.Request) {
		var jsonFilePath string

		switch request.URL.Path {
		case suffixInfo:
			jsonFilePath = "beat6_info.json"
		case suffixStats:
			jsonFilePath = "beat6_stats.json"
		default:
			w.WriteHeader(http.StatusInternalServerError)
			t.Errorf("Cannot handle request")
			return
		}

		data, err := os.ReadFile(jsonFilePath)
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			t.Errorf("Could not read from data file %q: %v", jsonFilePath, err)
			return
		}
		if _, err = w.Write(data); err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			t.Error(err)
			return
		}
	}))
	requestURL, err := url.Parse(beatTest.URL)
	require.NoErrorf(t, err, "can't parse URL %s", beatTest.URL)
	fakeServer.Listener, err = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
	require.NoErrorf(t, err, "can't listen for %s: %v", requestURL, err)

	fakeServer.Start()
	defer fakeServer.Close()

	require.NoError(t, beatTest.Gather(&beat6StatsAccumulator))

	beat6StatsAccumulator.AssertContainsTaggedFields(
		t,
		"beat",
		map[string]interface{}{
			"cpu_system_ticks":      float64(626970),
			"cpu_system_time_ms":    float64(626972),
			"cpu_total_ticks":       float64(5215010),
			"cpu_total_time_ms":     float64(5215018),
			"cpu_total_value":       float64(5215010),
			"cpu_user_ticks":        float64(4588040),
			"cpu_user_time_ms":      float64(4588046),
			"info_uptime_ms":        float64(327248661),
			"info_ephemeral_id":     "809e3b63-4fa0-4f74-822a-8e3c08298336",
			"memstats_gc_next":      float64(20611808),
			"memstats_memory_alloc": float64(12692544),
			"memstats_memory_total": float64(462910102088),
			"memstats_rss":          float64(80273408),
		},
		map[string]string{
			"beat_beat":    string("filebeat"),
			"beat_host":    string("node-6"),
			"beat_id":      string("9c1c8697-acb4-4df0-987d-28197814f785"),
			"beat_name":    string("node-6-test"),
			"beat_version": string("6.4.2"),
		},
	)
	beat6StatsAccumulator.AssertContainsTaggedFields(
		t,
		"beat_filebeat",
		map[string]interface{}{
			"events_active":             float64(0),
			"events_added":              float64(182990),
			"events_done":               float64(182990),
			"harvester_closed":          float64(2222),
			"harvester_open_files":      float64(4),
			"harvester_running":         float64(4),
			"harvester_skipped":         float64(0),
			"harvester_started":         float64(2226),
			"input_log_files_renamed":   float64(0),
			"input_log_files_truncated": float64(0),
		},
		map[string]string{
			"beat_beat":    string("filebeat"),
			"beat_host":    string("node-6"),
			"beat_id":      string("9c1c8697-acb4-4df0-987d-28197814f785"),
			"beat_name":    string("node-6-test"),
			"beat_version": string("6.4.2"),
		},
	)
	beat6StatsAccumulator.AssertContainsTaggedFields(
		t,
		"beat_libbeat",
		map[string]interface{}{
			"config_module_running":     float64(0),
			"config_module_starts":      float64(0),
			"config_module_stops":       float64(0),
			"config_reloads":            float64(0),
			"output_type":               "kafka",
			"output_events_acked":       float64(172067),
			"output_events_active":      float64(0),
			"output_events_batches":     float64(1490),
			"output_events_dropped":     float64(0),
			"output_events_duplicates":  float64(0),
			"output_events_failed":      float64(0),
			"output_events_total":       float64(172067),
			"output_read_bytes":         float64(0),
			"output_read_errors":        float64(0),
			"output_write_bytes":        float64(0),
			"output_write_errors":       float64(0),
			"outputs_kafka_bytes_read":  float64(1048670),
			"outputs_kafka_bytes_write": float64(43136887),
			"pipeline_clients":          float64(1),
			"pipeline_events_active":    float64(0),
			"pipeline_events_dropped":   float64(0),
			"pipeline_events_failed":    float64(0),
			"pipeline_events_filtered":  float64(10923),
			"pipeline_events_published": float64(172067),
			"pipeline_events_retry":     float64(14),
			"pipeline_events_total":     float64(182990),
			"pipeline_queue_acked":      float64(172067),
		},
		map[string]string{
			"beat_beat":    string("filebeat"),
			"beat_host":    string("node-6"),
			"beat_id":      string("9c1c8697-acb4-4df0-987d-28197814f785"),
			"beat_name":    string("node-6-test"),
			"beat_version": string("6.4.2"),
		},
	)
	beat6StatsAccumulator.AssertContainsTaggedFields(
		t,
		"beat_system",
		map[string]interface{}{
			"cpu_cores":    float64(32),
			"load_1":       float64(32.49),
			"load_15":      float64(41.9),
			"load_5":       float64(40.16),
			"load_norm_1":  float64(1.0153),
			"load_norm_15": float64(1.3094),
			"load_norm_5":  float64(1.255),
		},
		map[string]string{
			"beat_beat":    string("filebeat"),
			"beat_host":    string("node-6"),
			"beat_id":      string("9c1c8697-acb4-4df0-987d-28197814f785"),
			"beat_name":    string("node-6-test"),
			"beat_version": string("6.4.2"),
		},
	)
}

func Test_BeatRequest(t *testing.T) {
	var beat6StatsAccumulator testutil.Accumulator
	beatTest := newBeat()
	// System stats are disabled by default
	beatTest.Includes = []string{"beat", "libbeat", "system", "filebeat"}
	require.NoError(t, beatTest.Init())
	fakeServer := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, request *http.Request) {
		var jsonFilePath string

		switch request.URL.Path {
		case suffixInfo:
			jsonFilePath = "beat6_info.json"
		case suffixStats:
			jsonFilePath = "beat6_stats.json"
		default:
			w.WriteHeader(http.StatusInternalServerError)
			t.Errorf("Cannot handle request")
			return
		}

		data, err := os.ReadFile(jsonFilePath)
		if err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			t.Errorf("Could not read from data file %q: %v", jsonFilePath, err)
			return
		}
		if request.Host != "beat.test.local" {
			w.WriteHeader(http.StatusInternalServerError)
			t.Errorf("Not equal, expected: %q, actual: %q", "beat.test.local", request.Host)
			return
		}
		if request.Method != "POST" {
			w.WriteHeader(http.StatusInternalServerError)
			t.Errorf("Not equal, expected: %q, actual: %q", "POST", request.Method)
			return
		}
		if request.Header.Get("Authorization") != "Basic YWRtaW46UFdE" {
			w.WriteHeader(http.StatusInternalServerError)
			t.Errorf("Not equal, expected: %q, actual: %q", "Basic YWRtaW46UFdE", request.Header.Get("Authorization"))
			return
		}
		if request.Header.Get("X-Test") != "test-value" {
			w.WriteHeader(http.StatusInternalServerError)
			t.Errorf("Not equal, expected: %q, actual: %q", "test-value", request.Header.Get("X-Test"))
			return
		}
		if _, err = w.Write(data); err != nil {
			w.WriteHeader(http.StatusInternalServerError)
			t.Error(err)
			return
		}
	}))

	requestURL, err := url.Parse(beatTest.URL)
	require.NoErrorf(t, err, "can't parse URL %s", beatTest.URL)
	fakeServer.Listener, err = net.Listen("tcp", fmt.Sprintf("%s:%s", requestURL.Hostname(), requestURL.Port()))
	require.NoErrorf(t, err, "can't listen for %s: %v", requestURL, err)
	fakeServer.Start()
	defer fakeServer.Close()

	beatTest.Headers["X-Test"] = "test-value"
	beatTest.HostHeader = "beat.test.local"
	beatTest.Method = "POST"
	beatTest.Username = "admin"
	beatTest.Password = "PWD"

	require.NoError(t, beatTest.Gather(&beat6StatsAccumulator))
}
