From 6f9271fd18396f3b73f593d9d41354ca5c48921a Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Mon, 6 Jan 2025 05:23:35 +0530 Subject: [PATCH] test: add unit tests for translator, model, sql Signed-off-by: Shivanshu Raj Shrivastava --- .../messagingQueues/kafka/model_test.go | 237 +++++++++++++++++ .../messagingQueues/kafka/sql_test.go | 251 ++++++++++++++++++ .../messagingQueues/kafka/translator_test.go | 231 ++++++++++++++++ 3 files changed, 719 insertions(+) create mode 100644 pkg/query-service/app/integrations/messagingQueues/kafka/model_test.go create mode 100644 pkg/query-service/app/integrations/messagingQueues/kafka/sql_test.go create mode 100644 pkg/query-service/app/integrations/messagingQueues/kafka/translator_test.go diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/model_test.go b/pkg/query-service/app/integrations/messagingQueues/kafka/model_test.go new file mode 100644 index 00000000000..328ea580675 --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/model_test.go @@ -0,0 +1,237 @@ +package kafka + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestMessagingQueue(t *testing.T) { + tests := []struct { + name string + queue MessagingQueue + wantStart int64 + wantEnd int64 + wantVars map[string]string + }{ + { + name: "valid messaging queue", + queue: MessagingQueue{ + Start: time.Now().UnixNano(), + End: time.Now().Add(time.Hour).UnixNano(), + Variables: map[string]string{ + "topic": "test-topic", + "partition": "1", + "consumer_group": "test-group", + }, + }, + wantVars: map[string]string{ + "topic": "test-topic", + "partition": "1", + "consumer_group": "test-group", + }, + }, + { + name: "messaging queue with eval time", + queue: MessagingQueue{ + Start: time.Now().UnixNano(), + End: time.Now().Add(time.Hour).UnixNano(), + EvalTime: 1000000000, + Variables: map[string]string{ + "topic": "test-topic", + }, + }, + wantVars: map[string]string{ + "topic": "test-topic", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Greater(t, tt.queue.End, tt.queue.Start, "End time should be greater than start time") + assert.Equal(t, tt.wantVars, tt.queue.Variables, "Variables should match") + }) + } +} + +func TestClients(t *testing.T) { + tests := []struct { + name string + clients Clients + want int + }{ + { + name: "valid clients", + clients: Clients{ + Hash: map[string]struct{}{ + "client1": {}, + "client2": {}, + }, + ClientID: []string{"client1", "client2"}, + ServiceInstanceID: []string{"instance1", "instance2"}, + ServiceName: []string{"service1", "service2"}, + TopicName: []string{"topic1", "topic2"}, + }, + want: 2, + }, + { + name: "empty clients", + clients: Clients{ + Hash: make(map[string]struct{}), + ClientID: []string{}, + ServiceInstanceID: []string{}, + ServiceName: []string{}, + TopicName: []string{}, + }, + want: 0, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, len(tt.clients.Hash), "Hash length should match") + assert.Equal(t, tt.want, len(tt.clients.ClientID), "ClientID length should match") + assert.Equal(t, tt.want, len(tt.clients.ServiceInstanceID), "ServiceInstanceID length should match") + assert.Equal(t, tt.want, len(tt.clients.ServiceName), "ServiceName length should match") + assert.Equal(t, tt.want, len(tt.clients.TopicName), "TopicName length should match") + }) + } +} + +func TestQueueFilters(t *testing.T) { + tests := []struct { + name string + filters QueueFilters + want QueueFilters + }{ + { + name: "valid filters", + filters: QueueFilters{ + serviceName: "test-service", + spanName: "test-span", + queue: "test-queue", + destination: "test-destination", + kind: "test-kind", + }, + want: QueueFilters{ + serviceName: "test-service", + spanName: "test-span", + queue: "test-queue", + destination: "test-destination", + kind: "test-kind", + }, + }, + { + name: "empty filters", + filters: QueueFilters{ + serviceName: "", + spanName: "", + queue: "", + destination: "", + kind: "", + }, + want: QueueFilters{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, tt.filters, "Filters should match") + }) + } +} + +func TestCeleryTask(t *testing.T) { + tests := []struct { + name string + task CeleryTask + wantKind string + wantType string + }{ + { + name: "valid celery task", + task: CeleryTask{ + kind: "worker", + status: "active", + }, + wantKind: "worker", + wantType: "active", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.wantKind, tt.task.GetKind(), "Kind should match") + assert.Equal(t, tt.wantType, tt.task.GetStatus(), "Type should match") + + // Test Set method + tt.task.Set("new-type", "new-kind") + assert.Equal(t, "new-kind", tt.task.status, "Status should be updated to new-type") + }) + } +} + +func TestCeleryTaskInterface(t *testing.T) { + var task CeleryTasks = &CeleryTask{ + kind: "worker", + status: "active", + } + + t.Run("test interface implementation", func(t *testing.T) { + assert.Equal(t, "worker", task.GetKind(), "GetKind should return status") + assert.Equal(t, "active", task.GetStatus(), "GetType should return status") + + task.Set("new-type", "new-kind") + assert.Equal(t, "new-kind", task.GetStatus(), "Type should be updated") + assert.Equal(t, "new-type", task.GetKind(), "Kind should be updated") + }) +} + +func TestOnboardingResponse(t *testing.T) { + tests := []struct { + name string + response OnboardingResponse + want OnboardingResponse + }{ + { + name: "valid response", + response: OnboardingResponse{ + Attribute: "test-attribute", + Message: "test-message", + Status: "success", + }, + want: OnboardingResponse{ + Attribute: "test-attribute", + Message: "test-message", + Status: "success", + }, + }, + { + name: "error response", + response: OnboardingResponse{ + Attribute: "test-attribute", + Message: "error occurred", + Status: "error", + }, + want: OnboardingResponse{ + Attribute: "test-attribute", + Message: "error occurred", + Status: "error", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.want, tt.response, "Response should match") + }) + } +} + +func TestKafkaQueueConstant(t *testing.T) { + t.Run("kafka queue constant", func(t *testing.T) { + assert.Equal(t, "kafka", KafkaQueue, "KafkaQueue constant should be 'kafka'") + }) +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/sql_test.go b/pkg/query-service/app/integrations/messagingQueues/kafka/sql_test.go new file mode 100644 index 00000000000..97d5924cd8f --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/sql_test.go @@ -0,0 +1,251 @@ +package kafka + +import ( + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestGenerateConsumerSQL(t *testing.T) { + start := time.Now().Add(-1 * time.Hour).UnixNano() + end := time.Now().UnixNano() + tests := []struct { + name string + topic string + partition string + consumerGroup string + queueType string + wantContains []string + }{ + { + name: "valid consumer query", + topic: "test-topic", + partition: "1", + consumerGroup: "test-group", + queueType: "kafka", + wantContains: []string{ + "FROM signoz_traces.distributed_signoz_index_v3", + "messaging.destination.name", + "messaging.destination.partition.id", + "messaging.kafka.consumer.group", + "kind = 5", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + query := generateConsumerSQL(start, end, tt.topic, tt.partition, tt.consumerGroup, tt.queueType) + for _, want := range tt.wantContains { + assert.Contains(t, query, want) + } + }) + } +} + +func TestGeneratePartitionLatencySQL(t *testing.T) { + start := time.Now().Add(-1 * time.Hour).UnixNano() + end := time.Now().UnixNano() + tests := []struct { + name string + queueType string + wantContains []string + }{ + { + name: "valid partition latency query", + queueType: "kafka", + wantContains: []string{ + "WITH partition_query AS", + "kind = 4", + "messaging.destination.name", + "messaging.destination.partition.id", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + query := generatePartitionLatencySQL(start, end, tt.queueType) + for _, want := range tt.wantContains { + assert.Contains(t, query, want) + } + }) + } +} + +func TestGenerateProducerConsumerEvalSQL(t *testing.T) { + start := time.Now().Add(-1 * time.Hour).UnixNano() + end := time.Now().UnixNano() + evalTime := int64(1000000000) // 1 second in nanoseconds + tests := []struct { + name string + queueType string + wantContains []string + }{ + { + name: "valid producer consumer eval query", + queueType: "kafka", + wantContains: []string{ + "WITH trace_data AS", + "INNER JOIN", + "p.kind = 4", + "c.kind = 5", + "breach_percentage", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + query := generateProducerConsumerEvalSQL(start, end, tt.queueType, evalTime) + for _, want := range tt.wantContains { + assert.Contains(t, query, want) + } + }) + } +} + +func TestGenerateNetworkLatencyThroughputSQL(t *testing.T) { + start := time.Now().Add(-1 * time.Hour).UnixNano() + end := time.Now().UnixNano() + tests := []struct { + name string + consumerGroup string + partitionID string + queueType string + wantContains []string + }{ + { + name: "valid network latency query", + consumerGroup: "test-group", + partitionID: "1", + queueType: "kafka", + wantContains: []string{ + "messaging.client_id", + "service.instance.id", + "messaging.kafka.consumer.group", + "kind = 5", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + query := generateNetworkLatencyThroughputSQL(start, end, tt.consumerGroup, tt.partitionID, tt.queueType) + for _, want := range tt.wantContains { + assert.Contains(t, query, want) + } + }) + } +} + +func TestOnboardProducersSQL(t *testing.T) { + start := time.Now().Add(-1 * time.Hour).UnixNano() + end := time.Now().UnixNano() + tests := []struct { + name string + queueType string + wantContains []string + }{ + { + name: "valid onboard producers query", + queueType: "kafka", + wantContains: []string{ + "COUNT(*) = 0 AS entries", + "messaging.destination.name", + "messaging.destination.partition.id", + "kind = 4", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + query := onboardProducersSQL(start, end, tt.queueType) + for _, want := range tt.wantContains { + assert.Contains(t, query, want) + } + }) + } +} + +func TestOnboardConsumerSQL(t *testing.T) { + start := time.Now().Add(-1 * time.Hour).UnixNano() + end := time.Now().UnixNano() + tests := []struct { + name string + queueType string + wantContains []string + }{ + { + name: "valid onboard consumers query", + queueType: "kafka", + wantContains: []string{ + "COUNT(*) = 0 AS entries", + "messaging.kafka.consumer.group", + "messaging.message.body.size", + "messaging.client_id", + "service.instance.id", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + query := onboardConsumerSQL(start, end, tt.queueType) + for _, want := range tt.wantContains { + assert.Contains(t, query, want) + } + }) + } +} + +// Helper function to test SQL query validity +func TestSQLSyntax(t *testing.T) { + start := time.Now().Add(-1 * time.Hour).UnixNano() + end := time.Now().UnixNano() + + tests := []struct { + name string + queryFn func() string + required []string + }{ + { + name: "Consumer SQL", + queryFn: func() string { + return generateConsumerSQL(start, end, "test-topic", "1", "test-group", "kafka") + }, + required: []string{"SELECT", "FROM", "WHERE", "GROUP BY"}, + }, + { + name: "Overview SQL", + queryFn: func() string { + return generateOverviewSQL(start, end) + }, + required: []string{"WITH", "SELECT", "FROM", "WHERE", "GROUP BY"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + query := tt.queryFn() + + // Check for basic SQL syntax elements + for _, req := range tt.required { + assert.True(t, strings.Contains(strings.ToUpper(query), req), + "Query should contain %s", req) + } + + // Check for balanced parentheses + openCount := strings.Count(query, "(") + closeCount := strings.Count(query, ")") + assert.Equal(t, openCount, closeCount, "Unbalanced parentheses in query") + + // Check for proper semicolon termination + assert.True(t, strings.TrimSpace(query)[len(strings.TrimSpace(query))-1] == ';', + "Query should end with semicolon") + }) + } +} diff --git a/pkg/query-service/app/integrations/messagingQueues/kafka/translator_test.go b/pkg/query-service/app/integrations/messagingQueues/kafka/translator_test.go new file mode 100644 index 00000000000..c8a62f963fb --- /dev/null +++ b/pkg/query-service/app/integrations/messagingQueues/kafka/translator_test.go @@ -0,0 +1,231 @@ +package kafka + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestBuildQRParamsWithCache(t *testing.T) { + tests := []struct { + name string + messagingQueue *MessagingQueue + queryContext string + attributeCache *Clients + wantErr bool + expectedPanel v3.PanelType + }{ + { + name: "throughput query", + messagingQueue: &MessagingQueue{ + Start: time.Now().Add(-1 * time.Hour).UnixNano(), + End: time.Now().UnixNano(), + Variables: map[string]string{ + "consumer_group": "test-group", + "partition": "1", + }, + }, + queryContext: "throughput", + wantErr: false, + expectedPanel: v3.PanelTypeTable, + }, + { + name: "fetch-latency query", + messagingQueue: &MessagingQueue{ + Start: time.Now().Add(-1 * time.Hour).UnixNano(), + End: time.Now().UnixNano(), + }, + queryContext: "fetch-latency", + attributeCache: &Clients{ + ServiceName: []string{"test-service"}, + ClientID: []string{"test-client"}, + ServiceInstanceID: []string{"test-instance"}, + }, + wantErr: false, + expectedPanel: v3.PanelTypeTable, + }, + { + name: "producer-throughput-overview query", + messagingQueue: &MessagingQueue{ + Start: time.Now().Add(-1 * time.Hour).UnixNano(), + End: time.Now().UnixNano(), + }, + queryContext: "producer-throughput-overview", + wantErr: false, + expectedPanel: v3.PanelTypeTable, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + params, err := BuildQRParamsWithCache(tt.messagingQueue, tt.queryContext, tt.attributeCache) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.NotNil(t, params) + assert.Equal(t, tt.expectedPanel, params.CompositeQuery.PanelType) + }) + } +} + +func TestBuildClickHouseQuery(t *testing.T) { + tests := []struct { + name string + messagingQueue *MessagingQueue + queueType string + queryContext string + wantErr bool + }{ + { + name: "overview query", + messagingQueue: &MessagingQueue{ + Start: time.Now().Add(-1 * time.Hour).UnixNano(), + End: time.Now().UnixNano(), + }, + queueType: KafkaQueue, + queryContext: "overview", + wantErr: false, + }, + { + name: "producer query", + messagingQueue: &MessagingQueue{ + Start: time.Now().Add(-1 * time.Hour).UnixNano(), + End: time.Now().UnixNano(), + Variables: map[string]string{ + "topic": "test-topic", + "partition": "1", + }, + }, + queueType: KafkaQueue, + queryContext: "producer", + wantErr: false, + }, + { + name: "consumer query", + messagingQueue: &MessagingQueue{ + Start: time.Now().Add(-1 * time.Hour).UnixNano(), + End: time.Now().UnixNano(), + Variables: map[string]string{ + "topic": "test-topic", + "partition": "1", + "consumer_group": "test-group", + }, + }, + queueType: KafkaQueue, + queryContext: "consumer", + wantErr: false, + }, + { + name: "missing required variables", + messagingQueue: &MessagingQueue{ + Start: time.Now().Add(-1 * time.Hour).UnixNano(), + End: time.Now().UnixNano(), + Variables: map[string]string{}, + }, + queueType: KafkaQueue, + queryContext: "producer", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + query, err := BuildClickHouseQuery(tt.messagingQueue, tt.queueType, tt.queryContext) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.NotEmpty(t, query.Query) + }) + } +} + +func TestBuildCompositeQuery(t *testing.T) { + tests := []struct { + name string + chq *v3.ClickHouseQuery + queryContext string + wantPanel v3.PanelType + wantErr bool + }{ + { + name: "producer-consumer-eval query", + chq: &v3.ClickHouseQuery{ + Query: "SELECT * FROM test", + }, + queryContext: "producer-consumer-eval", + wantPanel: v3.PanelTypeList, + wantErr: false, + }, + { + name: "regular query", + chq: &v3.ClickHouseQuery{ + Query: "SELECT * FROM test", + }, + queryContext: "regular-query", + wantPanel: v3.PanelTypeTable, + wantErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cq, err := buildCompositeQuery(tt.chq, tt.queryContext) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.Equal(t, tt.wantPanel, cq.PanelType) + assert.NotNil(t, cq.ClickHouseQueries[tt.queryContext]) + }) + } +} + +func TestBuildQueryRangeParams(t *testing.T) { + tests := []struct { + name string + messagingQueue *MessagingQueue + queryContext string + wantErr bool + }{ + { + name: "valid query", + messagingQueue: &MessagingQueue{ + Start: time.Now().Add(-1 * time.Hour).UnixNano(), + End: time.Now().UnixNano(), + }, + queryContext: "overview", + wantErr: false, + }, + { + name: "disabled span evaluation", + messagingQueue: &MessagingQueue{ + Start: time.Now().Add(-1 * time.Hour).UnixNano(), + End: time.Now().UnixNano(), + }, + queryContext: "producer-consumer-eval", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + params, err := BuildQueryRangeParams(tt.messagingQueue, tt.queryContext) + if tt.wantErr { + assert.Error(t, err) + return + } + assert.NoError(t, err) + assert.NotNil(t, params) + assert.Equal(t, defaultStepInterval, params.Step) + assert.Equal(t, "v4", params.Version) + assert.True(t, params.FormatForWeb) + }) + } +}