Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Oct 6, 2023
1 parent 11d985e commit d742c46
Show file tree
Hide file tree
Showing 4 changed files with 3,979 additions and 2,835 deletions.
34 changes: 34 additions & 0 deletions e2e_tests/api_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package e2e_tests

import (
"context"
"errors"
"fmt"
"net/http"
"testing"

"github.com/stretchr/testify/require"
)

func createApiServer(t *testing.T, port int) {
mux := http.NewServeMux()
mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(200)
})

_, cancelCtx := context.WithCancel(context.Background())
s := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", port),
Handler: mux,
}

t.Cleanup(func() {
cancelCtx()
})

go func() {
if err := s.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
require.NoError(t, err)
}
}()
}
164 changes: 135 additions & 29 deletions e2e_tests/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,25 @@ import (
// make cert-manager
// kubectl get csr -o=jsonpath='{range.items[?(@.spec.signerName=="kubernetes.io/kubelet-serving")]}{.metadata.name}{" "}{end}' | xargs kubectl certificate approve

// When running tests you can use the following env vars to help with local development:
// SKIP_SETUP: skip setting up the chart and apps. Useful if they are already deployed.
// SKIP_TEARDOWN: skip deleting the chart and apps as part of cleanup. Useful to keep around for local development.

const (
testKubeConfig = "/tmp/kube-config-splunk-otel-collector-chart-e2e-testing"
hecReceiverPort = 8090
hecMetricsReceiverPort = 8091
signalFxReceiverPort = 9443
signalFxReceiverK8sClusterReceiverPort = 19443
otlpReceiverPort = 4317
apiPort = 8881
)

var setupRun = sync.Once{}

type sinks struct {
logsConsumer *consumertest.LogsSink
hecMetricsConsumer *consumertest.MetricsSink
agentMetricsConsumer *consumertest.MetricsSink
k8sclusterReceiverMetricsConsumer *consumertest.MetricsSink
tracesConsumer *consumertest.TracesSink
Expand All @@ -71,24 +78,28 @@ var globalSinks *sinks

func setupOnce(t *testing.T) *sinks {
setupRun.Do(func() {
// create an API server
createApiServer(t, apiPort)
// set ingest pipelines
logs, metrics := setupHEC(t)
globalSinks = &sinks{
logsConsumer: setupLogs(t),
agentMetricsConsumer: setupMetrics(t, signalFxReceiverPort),
k8sclusterReceiverMetricsConsumer: setupMetrics(t, signalFxReceiverK8sClusterReceiverPort),
logsConsumer: logs,
hecMetricsConsumer: metrics,
agentMetricsConsumer: setupSignalfxReceiver(t, signalFxReceiverPort),
k8sclusterReceiverMetricsConsumer: setupSignalfxReceiver(t, signalFxReceiverK8sClusterReceiverPort),
tracesConsumer: setupTraces(t),
}
// deploy the chart and applications.
if os.Getenv("SKIP_SETUP") == "true" {
t.Log("Skipping setup as SKIP_SETUP is set to true")
return
}
setup(t)
deployChartsAndApps(t)
})

return globalSinks
}
func setup(t *testing.T) {
func deployChartsAndApps(t *testing.T) {
kubeConfig, err := clientcmd.BuildConfigFromFlags("", testKubeConfig)
require.NoError(t, err)
clientset, err := kubernetes.NewForConfig(kubeConfig)
Expand All @@ -102,7 +113,9 @@ func setup(t *testing.T) {
valuesStr := strings.ReplaceAll(string(valuesBytes), "$K8S_CLUSTER_ENDPOINT", fmt.Sprintf("http://%s:%d", hostEndpoint(t), signalFxReceiverK8sClusterReceiverPort))
valuesStr = strings.ReplaceAll(valuesStr, "$AGENT_ENDPOINT", fmt.Sprintf("http://%s:%d", hostEndpoint(t), signalFxReceiverPort))
valuesStr = strings.ReplaceAll(valuesStr, "$LOG_HEC_ENDPOINT", fmt.Sprintf("http://%s:%d", hostEndpoint(t), hecReceiverPort))
valuesStr = strings.ReplaceAll(valuesStr, "$METRIC_HEC_ENDPOINT", fmt.Sprintf("http://%s:%d/services/collector", hostEndpoint(t), hecMetricsReceiverPort))
valuesStr = strings.ReplaceAll(valuesStr, "$OTLP_ENDPOINT", fmt.Sprintf("%s:%d", hostEndpoint(t), otlpReceiverPort))
valuesStr = strings.ReplaceAll(valuesStr, "$API_URL_ENDPOINT", fmt.Sprintf("http://%s:%d", hostEndpoint(t), apiPort))
var values map[string]interface{}
err = yaml.Unmarshal([]byte(valuesStr), &values)
require.NoError(t, err)
Expand Down Expand Up @@ -145,6 +158,21 @@ func setup(t *testing.T) {
}

waitForAllDeploymentsToStart(t, clientset)

t.Cleanup(func() {
if os.Getenv("SKIP_TEARDOWN") == "true" {
t.Log("Skipping teardown as SKIP_TEARDOWN is set to true")
return
}
waitTime := int64(0)
_ = deployments.Delete(context.Background(), "nodejs-test", metav1.DeleteOptions{
GracePeriodSeconds: &waitTime,
})
uninstall := action.NewUninstall(actionConfig)
uninstall.IgnoreNotFound = true
uninstall.Wait = true
_, _ = uninstall.Run("sock")
})
}

func Test_NodeJSTraces(t *testing.T) {
Expand Down Expand Up @@ -178,14 +206,33 @@ func Test_NodeJSTraces(t *testing.T) {
func Test_KubernetesClusterReceiverMetrics(t *testing.T) {
metricsConsumer := setupOnce(t).k8sclusterReceiverMetricsConsumer

waitForMetrics(t, 3, metricsConsumer)
waitForMetrics(t, 10, metricsConsumer)

expectedMetricsFile := filepath.Join("testdata", "expected_cluster_receiver.yaml")
expectedMetrics, err := readMetrics(expectedMetricsFile)
require.NoError(t, err)

var metricToCompare pmetric.Metrics
OUTER:
for _, m := range metricsConsumer.AllMetrics() {
for i := 0; i < m.ResourceMetrics().Len(); i++ {
rm := m.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
sm := rm.ScopeMetrics().At(j)
for k := 0; k < sm.Metrics().Len(); k++ {
metric := sm.Metrics().At(k)
if metric.Name() == "k8s.deployment.desired" {
metricToCompare = m
break OUTER
}
}
}
}
}
require.True(t, metricToCompare.MetricCount() > 0)

require.NoError(t,
pmetrictest.CompareMetrics(expectedMetrics, metricsConsumer.AllMetrics()[len(metricsConsumer.AllMetrics())-1],
pmetrictest.CompareMetrics(expectedMetrics, metricToCompare,
pmetrictest.IgnoreTimestamp(),
pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreMetricValues("k8s.deployment.desired", "k8s.deployment.available", "k8s.container.restarts", "k8s.container.cpu_request", "k8s.container.memory_request", "k8s.container.memory_limit"),
Expand Down Expand Up @@ -244,7 +291,12 @@ func Test_AgentMetrics(t *testing.T) {
agentMetricsConsumer := setupOnce(t).agentMetricsConsumer

metricNames := []string{
"otelcol_exporter_send_failed_log_records",
"container.filesystem.available",
"container.filesystem.capacity",
"container.filesystem.usage",
"container.memory.usage",
"k8s.pod.network.errors",
"k8s.pod.network.io",
"otelcol_exporter_sent_log_records",
"otelcol_otelsvc_k8s_ip_lookup_miss",
"otelcol_processor_refused_log_records",
Expand All @@ -253,7 +305,43 @@ func Test_AgentMetrics(t *testing.T) {
"otelcol_processor_batch_batch_send_size_sum",
"otelcol_processor_batch_batch_send_size_count",
"otelcol_processor_batch_batch_send_size_bucket",
"otelcol_otelsvc_k8s_namespace_updated",
"otelcol_exporter_queue_size",
"otelcol_exporter_sent_metric_points",
"otelcol_otelsvc_k8s_namespace_added",
"otelcol_otelsvc_k8s_pod_added",
"otelcol_otelsvc_k8s_pod_table_size",
"otelcol_otelsvc_k8s_pod_updated",
"otelcol_process_cpu_seconds",
"otelcol_process_memory_rss",
"otelcol_process_runtime_heap_alloc_bytes",
"otelcol_process_runtime_total_alloc_bytes",
"otelcol_process_runtime_total_sys_memory_bytes",
"otelcol_process_uptime",
"otelcol_processor_accepted_metric_points",
"otelcol_processor_batch_timeout_trigger_send",
"otelcol_processor_dropped_metric_points",
"otelcol_processor_refused_metric_points",
"otelcol_receiver_accepted_metric_points",
"otelcol_receiver_refused_metric_points",
"otelcol_scraper_errored_metric_points",
"otelcol_scraper_scraped_metric_points",
"system.cpu.load_average.15m",
"system.cpu.load_average.1m",
"system.cpu.load_average.5m",
"system.disk.operations",
"system.filesystem.usage",
"system.memory.usage",
"system.network.errors",
"system.network.io",
"system.paging.operations",
}
checkMetricsAreEmitted(t, agentMetricsConsumer, metricNames)
}

func Test_HECMetrics(t *testing.T) {
hecMetricsConsumer := setupOnce(t).hecMetricsConsumer

metricNames := []string{
"container.cpu.time",
"container.cpu.utilization",
"container.filesystem.available",
Expand Down Expand Up @@ -281,8 +369,15 @@ func Test_AgentMetrics(t *testing.T) {
"k8s.pod.network.errors",
"k8s.pod.network.io",
"otelcol_exporter_queue_size",
"otelcol_exporter_send_failed_metric_points",
"otelcol_exporter_sent_metric_points",
"otelcol_exporter_sent_log_records",
"otelcol_otelsvc_k8s_ip_lookup_miss",
"otelcol_processor_refused_log_records",
"otelcol_processor_dropped_log_records",
"otelcol_processor_accepted_log_records",
"otelcol_processor_batch_batch_send_size_sum",
"otelcol_processor_batch_batch_send_size_count",
"otelcol_processor_batch_batch_send_size_bucket",
"otelcol_otelsvc_k8s_namespace_added",
"otelcol_otelsvc_k8s_pod_added",
"otelcol_otelsvc_k8s_pod_table_size",
Expand Down Expand Up @@ -326,7 +421,7 @@ func Test_AgentMetrics(t *testing.T) {
"system.processes.count",
"system.processes.created",
}
checkMetricsAreEmitted(t, agentMetricsConsumer, metricNames)
checkMetricsAreEmitted(t, hecMetricsConsumer, metricNames)
}

func waitForAllDeploymentsToStart(t *testing.T, clientset *kubernetes.Clientset) {
Expand Down Expand Up @@ -360,7 +455,7 @@ func setupTraces(t *testing.T) *consumertest.TracesSink {
return tc
}

func setupMetrics(t *testing.T, port int) *consumertest.MetricsSink {
func setupSignalfxReceiver(t *testing.T, port int) *consumertest.MetricsSink {
mc := new(consumertest.MetricsSink)
f := signalfxreceiver.NewFactory()
cfg := f.CreateDefaultConfig().(*signalfxreceiver.Config)
Expand All @@ -378,63 +473,74 @@ func setupMetrics(t *testing.T, port int) *consumertest.MetricsSink {
return mc
}

func setupLogs(t *testing.T) *consumertest.LogsSink {
func setupHEC(t *testing.T) (*consumertest.LogsSink, *consumertest.MetricsSink) {
// the splunkhecreceiver does poorly at receiving logs and metrics. Use separate ports for now.
f := splunkhecreceiver.NewFactory()
cfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
cfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecReceiverPort)

mCfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
mCfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecMetricsReceiverPort)

lc := new(consumertest.LogsSink)
mc := new(consumertest.MetricsSink)
rcvr, err := f.CreateLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, lc)
mrcvr, err := f.CreateMetricsReceiver(context.Background(), receivertest.NewNopCreateSettings(), mCfg, mc)
require.NoError(t, err)

require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, err, "failed creating logs receiver")
t.Cleanup(func() {
assert.NoError(t, rcvr.Shutdown(context.Background()))
})

require.NoError(t, mrcvr.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, err, "failed creating metrics receiver")
t.Cleanup(func() {
assert.NoError(t, mrcvr.Shutdown(context.Background()))
})

return lc
return lc, mc
}

func checkMetricsAreEmitted(t *testing.T, mc *consumertest.MetricsSink, metricNames []string) {
metricsToFind := map[string]bool{}
for _, name := range metricNames {
metricsToFind[name] = false
}
var stillMissing []string
timeoutMinutes := 3
require.Eventuallyf(t, func() bool {

if len(mc.AllMetrics()) == 0 {
return false
}
m := mc.AllMetrics()[len(mc.AllMetrics())-1]
for i := 0; i < m.ResourceMetrics().Len(); i++ {
rm := m.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
sm := rm.ScopeMetrics().At(j)
for k := 0; k < sm.Metrics().Len(); k++ {
metric := sm.Metrics().At(k)
metricsToFind[metric.Name()] = true
for _, m := range mc.AllMetrics() {
for i := 0; i < m.ResourceMetrics().Len(); i++ {
rm := m.ResourceMetrics().At(i)
for j := 0; j < rm.ScopeMetrics().Len(); j++ {
sm := rm.ScopeMetrics().At(j)
for k := 0; k < sm.Metrics().Len(); k++ {
metric := sm.Metrics().At(k)
metricsToFind[metric.Name()] = true
}
}
}
}
stillMissing = []string{}
var stillMissing []string
var found []string
missingCount := 0
foundCount := 0
for _, name := range metricNames {
if !metricsToFind[name] {
stillMissing = append(stillMissing, name)
missingCount++
} else {
found = append(found, name)
foundCount++
}
}
t.Logf("Found: %s", strings.Join(found, ","))
t.Logf("Metrics found: %d, metrics still missing: %d\n%s\n", foundCount, missingCount, strings.Join(stillMissing, ","))
return missingCount == 0
}, time.Duration(timeoutMinutes)*time.Minute, 1*time.Second,
"failed to receive all metrics %d minutes, missing metrics: %s", timeoutMinutes, strings.Join(stillMissing, ","))
}, time.Duration(timeoutMinutes)*time.Minute, 10*time.Second,
"failed to receive all metrics %d minutes", timeoutMinutes)
}

func hostEndpoint(t *testing.T) string {
Expand Down
Loading

0 comments on commit d742c46

Please sign in to comment.