From 8df00c779852089c4a0d0b883959ad4836ca0e1f Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 9 Nov 2024 22:08:00 +0530 Subject: [PATCH] chore: add k8s statefulset resources (#6409) --- pkg/query-service/app/http_handler.go | 20 +- pkg/query-service/app/infra.go | 104 ++++ pkg/query-service/app/inframetrics/common.go | 8 + pkg/query-service/app/inframetrics/jobs.go | 498 ++++++++++++++++++ .../app/inframetrics/statefulsets.go | 444 ++++++++++++++++ pkg/query-service/model/infra.go | 62 +++ 6 files changed, 1134 insertions(+), 2 deletions(-) create mode 100644 pkg/query-service/app/inframetrics/jobs.go create mode 100644 pkg/query-service/app/inframetrics/statefulsets.go diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index aa5a5cb137..b62e015020 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -119,8 +119,10 @@ type APIHandler struct { namespacesRepo *inframetrics.NamespacesRepo clustersRepo *inframetrics.ClustersRepo // workloads - deploymentsRepo *inframetrics.DeploymentsRepo - daemonsetsRepo *inframetrics.DaemonSetsRepo + deploymentsRepo *inframetrics.DeploymentsRepo + daemonsetsRepo *inframetrics.DaemonSetsRepo + statefulsetsRepo *inframetrics.StatefulSetsRepo + jobsRepo *inframetrics.JobsRepo } type APIHandlerOpts struct { @@ -198,6 +200,8 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { clustersRepo := inframetrics.NewClustersRepo(opts.Reader, querierv2) deploymentsRepo := inframetrics.NewDeploymentsRepo(opts.Reader, querierv2) daemonsetsRepo := inframetrics.NewDaemonSetsRepo(opts.Reader, querierv2) + statefulsetsRepo := inframetrics.NewStatefulSetsRepo(opts.Reader, querierv2) + jobsRepo := inframetrics.NewJobsRepo(opts.Reader, querierv2) aH := &APIHandler{ reader: opts.Reader, @@ -224,6 +228,8 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { clustersRepo: clustersRepo, deploymentsRepo: deploymentsRepo, daemonsetsRepo: daemonsetsRepo, + statefulsetsRepo: statefulsetsRepo, + jobsRepo: jobsRepo, } logsQueryBuilder := logsv3.PrepareLogsQuery @@ -412,6 +418,16 @@ func (aH *APIHandler) RegisterInfraMetricsRoutes(router *mux.Router, am *AuthMid daemonsetsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getDaemonSetAttributeKeys)).Methods(http.MethodGet) daemonsetsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getDaemonSetAttributeValues)).Methods(http.MethodGet) daemonsetsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getDaemonSetList)).Methods(http.MethodPost) + + statefulsetsSubRouter := router.PathPrefix("/api/v1/statefulsets").Subrouter() + statefulsetsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getStatefulSetAttributeKeys)).Methods(http.MethodGet) + statefulsetsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getStatefulSetAttributeValues)).Methods(http.MethodGet) + statefulsetsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getStatefulSetList)).Methods(http.MethodPost) + + jobsSubRouter := router.PathPrefix("/api/v1/jobs").Subrouter() + jobsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getJobAttributeKeys)).Methods(http.MethodGet) + jobsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getJobAttributeValues)).Methods(http.MethodGet) + jobsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getJobList)).Methods(http.MethodPost) } func (aH *APIHandler) RegisterWebSocketPaths(router *mux.Router, am *AuthMiddleware) { diff --git a/pkg/query-service/app/infra.go b/pkg/query-service/app/infra.go index 1cd22ba244..b1f741e244 100644 --- a/pkg/query-service/app/infra.go +++ b/pkg/query-service/app/infra.go @@ -440,3 +440,107 @@ func (aH *APIHandler) getDaemonSetList(w http.ResponseWriter, r *http.Request) { aH.Respond(w, daemonSetList) } + +func (aH *APIHandler) getStatefulSetAttributeKeys(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeKeyRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + keys, err := aH.statefulsetsRepo.GetStatefulSetAttributeKeys(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, keys) +} + +func (aH *APIHandler) getStatefulSetAttributeValues(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeValueRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + values, err := aH.statefulsetsRepo.GetStatefulSetAttributeValues(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, values) +} + +func (aH *APIHandler) getStatefulSetList(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req := model.StatefulSetListRequest{} + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + statefulSetList, err := aH.statefulsetsRepo.GetStatefulSetList(ctx, req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, statefulSetList) +} + +func (aH *APIHandler) getJobAttributeKeys(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeKeyRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + keys, err := aH.jobsRepo.GetJobAttributeKeys(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + aH.Respond(w, keys) +} + +func (aH *APIHandler) getJobAttributeValues(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeValueRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + values, err := aH.jobsRepo.GetJobAttributeValues(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + aH.Respond(w, values) +} + +func (aH *APIHandler) getJobList(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req := model.JobListRequest{} + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + jobList, err := aH.jobsRepo.GetJobList(ctx, req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, jobList) +} diff --git a/pkg/query-service/app/inframetrics/common.go b/pkg/query-service/app/inframetrics/common.go index 2ffc859936..c4c280cb98 100644 --- a/pkg/query-service/app/inframetrics/common.go +++ b/pkg/query-service/app/inframetrics/common.go @@ -81,6 +81,14 @@ func getParamsForTopDaemonSets(req model.DaemonSetListRequest) (int64, string, s return getParamsForTopItems(req.Start, req.End) } +func getParamsForTopStatefulSets(req model.StatefulSetListRequest) (int64, string, string) { + return getParamsForTopItems(req.Start, req.End) +} + +func getParamsForTopJobs(req model.JobListRequest) (int64, string, string) { + return getParamsForTopItems(req.Start, req.End) +} + // TODO(srikanthccv): remove this // What is happening here? // The `PrepareTimeseriesFilterQuery` uses the local time series table for sub-query because each fingerprint diff --git a/pkg/query-service/app/inframetrics/jobs.go b/pkg/query-service/app/inframetrics/jobs.go new file mode 100644 index 0000000000..42300f0b87 --- /dev/null +++ b/pkg/query-service/app/inframetrics/jobs.go @@ -0,0 +1,498 @@ +package inframetrics + +import ( + "context" + "math" + "sort" + + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" + "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/postprocess" + "golang.org/x/exp/slices" +) + +var ( + metricToUseForJobs = "k8s_pod_cpu_utilization" + k8sJobNameAttrKey = "k8s_job_name" + + metricNamesForJobs = map[string]string{ + "desired_successful_pods": "k8s_job_desired_successful_pods", + "active_pods": "k8s_job_active_pods", + "failed_pods": "k8s_job_failed_pods", + "successful_pods": "k8s_job_successful_pods", + } + + jobAttrsToEnrich = []string{ + "k8s_job_name", + "k8s_namespace_name", + "k8s_cluster_name", + } + + queryNamesForJobs = map[string][]string{ + "cpu": {"A"}, + "cpu_request": {"B", "A"}, + "cpu_limit": {"C", "A"}, + "memory": {"D"}, + "memory_request": {"E", "D"}, + "memory_limit": {"F", "D"}, + "restarts": {"G", "A"}, + "desired_pods": {"H"}, + "active_pods": {"I"}, + "failed_pods": {"J"}, + "successful_pods": {"K"}, + } + + builderQueriesForJobs = map[string]*v3.BuilderQuery{ + // desired nodes + "H": { + QueryName: "H", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForJobs["desired_successful_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "H", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // available nodes + "I": { + QueryName: "I", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForJobs["active_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "I", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // failed pods + "J": { + QueryName: "J", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForJobs["failed_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "J", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // successful pods + "K": { + QueryName: "K", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForJobs["successful_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "K", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + } + + jobQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"} +) + +type JobsRepo struct { + reader interfaces.Reader + querierV2 interfaces.Querier +} + +func NewJobsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *JobsRepo { + return &JobsRepo{reader: reader, querierV2: querierV2} +} + +func (d *JobsRepo) GetJobAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + // TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = metricToUseForJobs + if req.Limit == 0 { + req.Limit = 50 + } + + attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req) + if err != nil { + return nil, err + } + + // TODO(srikanthccv): only return resource attributes when we have a way to + // distinguish between resource attributes and other attributes. + filteredKeys := []v3.AttributeKey{} + for _, key := range attributeKeysResponse.AttributeKeys { + if slices.Contains(pointAttrsToIgnore, key.Key) { + continue + } + filteredKeys = append(filteredKeys, key) + } + + return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil +} + +func (d *JobsRepo) GetJobAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = metricToUseForJobs + if req.Limit == 0 { + req.Limit = 50 + } + + attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req) + if err != nil { + return nil, err + } + + return attributeValuesResponse, nil +} + +func (d *JobsRepo) getMetadataAttributes(ctx context.Context, req model.JobListRequest) (map[string]map[string]string, error) { + jobAttrs := map[string]map[string]string{} + + for _, key := range jobAttrsToEnrich { + hasKey := false + for _, groupByKey := range req.GroupBy { + if groupByKey.Key == key { + hasKey = true + break + } + } + if !hasKey { + req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key}) + } + } + + mq := v3.BuilderQuery{ + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricToUseForJobs, + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + GroupBy: req.GroupBy, + } + + query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq) + if err != nil { + return nil, err + } + + query = localQueryToDistributedQuery(query) + + attrsListResponse, err := d.reader.GetListResultV3(ctx, query) + if err != nil { + return nil, err + } + + for _, row := range attrsListResponse { + stringData := map[string]string{} + for key, value := range row.Data { + if str, ok := value.(string); ok { + stringData[key] = str + } else if strPtr, ok := value.(*string); ok { + stringData[key] = *strPtr + } + } + + jobName := stringData[k8sJobNameAttrKey] + if _, ok := jobAttrs[jobName]; !ok { + jobAttrs[jobName] = map[string]string{} + } + + for _, key := range req.GroupBy { + jobAttrs[jobName][key.Key] = stringData[key.Key] + } + } + + return jobAttrs, nil +} + +func (d *JobsRepo) getTopJobGroups(ctx context.Context, req model.JobListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) { + step, timeSeriesTableName, samplesTableName := getParamsForTopJobs(req) + + queryNames := queryNamesForJobs[req.OrderBy.ColumnName] + topJobGroupsQueryRangeParams := &v3.QueryRangeParamsV3{ + Start: req.Start, + End: req.End, + Step: step, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{}, + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeTable, + }, + } + + for _, queryName := range queryNames { + query := q.CompositeQuery.BuilderQueries[queryName].Clone() + query.StepInterval = step + query.MetricTableHints = &v3.MetricTableHints{ + TimeSeriesTableName: timeSeriesTableName, + SamplesTableName: samplesTableName, + } + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + topJobGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query + } + + queryResponse, _, err := d.querierV2.QueryRange(ctx, topJobGroupsQueryRangeParams) + if err != nil { + return nil, nil, err + } + formattedResponse, err := postprocess.PostProcessResult(queryResponse, topJobGroupsQueryRangeParams) + if err != nil { + return nil, nil, err + } + + if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 { + return nil, nil, nil + } + + if req.OrderBy.Order == v3.DirectionDesc { + sort.Slice(formattedResponse[0].Series, func(i, j int) bool { + return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value + }) + } else { + sort.Slice(formattedResponse[0].Series, func(i, j int) bool { + return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value + }) + } + + limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series))) + + paginatedTopJobGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)] + + topJobGroups := []map[string]string{} + for _, series := range paginatedTopJobGroupsSeries { + topJobGroups = append(topJobGroups, series.Labels) + } + allJobGroups := []map[string]string{} + for _, series := range formattedResponse[0].Series { + allJobGroups = append(allJobGroups, series.Labels) + } + + return topJobGroups, allJobGroups, nil +} + +func (d *JobsRepo) GetJobList(ctx context.Context, req model.JobListRequest) (model.JobListResponse, error) { + resp := model.JobListResponse{} + + if req.Limit == 0 { + req.Limit = 10 + } + + if req.OrderBy == nil { + req.OrderBy = &v3.OrderBy{ColumnName: "desired_pods", Order: v3.DirectionDesc} + } + + if req.GroupBy == nil { + req.GroupBy = []v3.AttributeKey{{Key: k8sJobNameAttrKey}} + resp.Type = model.ResponseTypeList + } else { + resp.Type = model.ResponseTypeGroupedList + } + + step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60)) + + query := WorkloadTableListQuery.Clone() + + query.Start = req.Start + query.End = req.End + query.Step = step + + // add additional queries for jobs + for _, jobQuery := range builderQueriesForJobs { + query.CompositeQuery.BuilderQueries[jobQuery.QueryName] = jobQuery + } + + for _, query := range query.CompositeQuery.BuilderQueries { + query.StepInterval = step + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + query.GroupBy = req.GroupBy + // make sure we only get records for jobs + query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: k8sJobNameAttrKey}, + Operator: v3.FilterOperatorExists, + }) + } + + jobAttrs, err := d.getMetadataAttributes(ctx, req) + if err != nil { + return resp, err + } + + topJobGroups, allJobGroups, err := d.getTopJobGroups(ctx, req, query) + if err != nil { + return resp, err + } + + groupFilters := map[string][]string{} + for _, topJobGroup := range topJobGroups { + for k, v := range topJobGroup { + groupFilters[k] = append(groupFilters[k], v) + } + } + + for groupKey, groupValues := range groupFilters { + hasGroupFilter := false + if req.Filters != nil && len(req.Filters.Items) > 0 { + for _, filter := range req.Filters.Items { + if filter.Key.Key == groupKey { + hasGroupFilter = true + break + } + } + } + + if !hasGroupFilter { + for _, query := range query.CompositeQuery.BuilderQueries { + query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: groupKey}, + Value: groupValues, + Operator: v3.FilterOperatorIn, + }) + } + } + } + + queryResponse, _, err := d.querierV2.QueryRange(ctx, query) + if err != nil { + return resp, err + } + + formattedResponse, err := postprocess.PostProcessResult(queryResponse, query) + if err != nil { + return resp, err + } + + records := []model.JobListRecord{} + + for _, result := range formattedResponse { + for _, row := range result.Table.Rows { + + record := model.JobListRecord{ + JobName: "", + CPUUsage: -1, + CPURequest: -1, + CPULimit: -1, + MemoryUsage: -1, + MemoryRequest: -1, + MemoryLimit: -1, + DesiredSuccessfulPods: -1, + ActivePods: -1, + FailedPods: -1, + SuccessfulPods: -1, + } + + if jobName, ok := row.Data[k8sJobNameAttrKey].(string); ok { + record.JobName = jobName + } + + if cpu, ok := row.Data["A"].(float64); ok { + record.CPUUsage = cpu + } + if cpuRequest, ok := row.Data["B"].(float64); ok { + record.CPURequest = cpuRequest + } + + if cpuLimit, ok := row.Data["C"].(float64); ok { + record.CPULimit = cpuLimit + } + + if memory, ok := row.Data["D"].(float64); ok { + record.MemoryUsage = memory + } + + if memoryRequest, ok := row.Data["E"].(float64); ok { + record.MemoryRequest = memoryRequest + } + + if memoryLimit, ok := row.Data["F"].(float64); ok { + record.MemoryLimit = memoryLimit + } + + if restarts, ok := row.Data["G"].(float64); ok { + record.Restarts = int(restarts) + } + + if desiredSuccessfulPods, ok := row.Data["H"].(float64); ok { + record.DesiredSuccessfulPods = int(desiredSuccessfulPods) + } + + if activePods, ok := row.Data["I"].(float64); ok { + record.ActivePods = int(activePods) + } + + if failedPods, ok := row.Data["J"].(float64); ok { + record.FailedPods = int(failedPods) + } + + if successfulPods, ok := row.Data["K"].(float64); ok { + record.SuccessfulPods = int(successfulPods) + } + + record.Meta = map[string]string{} + if _, ok := jobAttrs[record.JobName]; ok { + record.Meta = jobAttrs[record.JobName] + } + + for k, v := range row.Data { + if slices.Contains(jobQueryNames, k) { + continue + } + if labelValue, ok := v.(string); ok { + record.Meta[k] = labelValue + } + } + + records = append(records, record) + } + } + resp.Total = len(allJobGroups) + resp.Records = records + + return resp, nil +} diff --git a/pkg/query-service/app/inframetrics/statefulsets.go b/pkg/query-service/app/inframetrics/statefulsets.go new file mode 100644 index 0000000000..2d5d6d8313 --- /dev/null +++ b/pkg/query-service/app/inframetrics/statefulsets.go @@ -0,0 +1,444 @@ +package inframetrics + +import ( + "context" + "math" + "sort" + + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" + "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/postprocess" + "golang.org/x/exp/slices" +) + +var ( + metricToUseForStatefulSets = "k8s_pod_cpu_utilization" + k8sStatefulSetNameAttrKey = "k8s_statefulset_name" + + metricNamesForStatefulSets = map[string]string{ + "desired_pods": "k8s_statefulset_desired_pods", + "available_pods": "k8s_statefulset_current_pods", + } + + statefulSetAttrsToEnrich = []string{ + "k8s_statefulset_name", + "k8s_namespace_name", + "k8s_cluster_name", + } + + queryNamesForStatefulSets = map[string][]string{ + "cpu": {"A"}, + "cpu_request": {"B", "A"}, + "cpu_limit": {"C", "A"}, + "memory": {"D"}, + "memory_request": {"E", "D"}, + "memory_limit": {"F", "D"}, + "restarts": {"G", "A"}, + "desired_pods": {"H"}, + "available_pods": {"I"}, + } + + builderQueriesForStatefulSets = map[string]*v3.BuilderQuery{ + // desired pods + "H": { + QueryName: "H", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForStatefulSets["desired_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "H", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // available pods + "I": { + QueryName: "I", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForStatefulSets["available_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "I", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + } + + statefulSetQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"} +) + +type StatefulSetsRepo struct { + reader interfaces.Reader + querierV2 interfaces.Querier +} + +func NewStatefulSetsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *StatefulSetsRepo { + return &StatefulSetsRepo{reader: reader, querierV2: querierV2} +} + +func (d *StatefulSetsRepo) GetStatefulSetAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + // TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = metricToUseForStatefulSets + if req.Limit == 0 { + req.Limit = 50 + } + + attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req) + if err != nil { + return nil, err + } + + // TODO(srikanthccv): only return resource attributes when we have a way to + // distinguish between resource attributes and other attributes. + filteredKeys := []v3.AttributeKey{} + for _, key := range attributeKeysResponse.AttributeKeys { + if slices.Contains(pointAttrsToIgnore, key.Key) { + continue + } + filteredKeys = append(filteredKeys, key) + } + + return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil +} + +func (d *StatefulSetsRepo) GetStatefulSetAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = metricToUseForStatefulSets + if req.Limit == 0 { + req.Limit = 50 + } + + attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req) + if err != nil { + return nil, err + } + + return attributeValuesResponse, nil +} + +func (d *StatefulSetsRepo) getMetadataAttributes(ctx context.Context, req model.StatefulSetListRequest) (map[string]map[string]string, error) { + statefulSetAttrs := map[string]map[string]string{} + + for _, key := range statefulSetAttrsToEnrich { + hasKey := false + for _, groupByKey := range req.GroupBy { + if groupByKey.Key == key { + hasKey = true + break + } + } + if !hasKey { + req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key}) + } + } + + mq := v3.BuilderQuery{ + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricToUseForStatefulSets, + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + GroupBy: req.GroupBy, + } + + query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq) + if err != nil { + return nil, err + } + + query = localQueryToDistributedQuery(query) + + attrsListResponse, err := d.reader.GetListResultV3(ctx, query) + if err != nil { + return nil, err + } + + for _, row := range attrsListResponse { + stringData := map[string]string{} + for key, value := range row.Data { + if str, ok := value.(string); ok { + stringData[key] = str + } else if strPtr, ok := value.(*string); ok { + stringData[key] = *strPtr + } + } + + statefulSetName := stringData[k8sStatefulSetNameAttrKey] + if _, ok := statefulSetAttrs[statefulSetName]; !ok { + statefulSetAttrs[statefulSetName] = map[string]string{} + } + + for _, key := range req.GroupBy { + statefulSetAttrs[statefulSetName][key.Key] = stringData[key.Key] + } + } + + return statefulSetAttrs, nil +} + +func (d *StatefulSetsRepo) getTopStatefulSetGroups(ctx context.Context, req model.StatefulSetListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) { + step, timeSeriesTableName, samplesTableName := getParamsForTopStatefulSets(req) + + queryNames := queryNamesForStatefulSets[req.OrderBy.ColumnName] + topStatefulSetGroupsQueryRangeParams := &v3.QueryRangeParamsV3{ + Start: req.Start, + End: req.End, + Step: step, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{}, + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeTable, + }, + } + + for _, queryName := range queryNames { + query := q.CompositeQuery.BuilderQueries[queryName].Clone() + query.StepInterval = step + query.MetricTableHints = &v3.MetricTableHints{ + TimeSeriesTableName: timeSeriesTableName, + SamplesTableName: samplesTableName, + } + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + topStatefulSetGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query + } + + queryResponse, _, err := d.querierV2.QueryRange(ctx, topStatefulSetGroupsQueryRangeParams) + if err != nil { + return nil, nil, err + } + formattedResponse, err := postprocess.PostProcessResult(queryResponse, topStatefulSetGroupsQueryRangeParams) + if err != nil { + return nil, nil, err + } + + if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 { + return nil, nil, nil + } + + if req.OrderBy.Order == v3.DirectionDesc { + sort.Slice(formattedResponse[0].Series, func(i, j int) bool { + return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value + }) + } else { + sort.Slice(formattedResponse[0].Series, func(i, j int) bool { + return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value + }) + } + + limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series))) + + paginatedTopStatefulSetGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)] + + topStatefulSetGroups := []map[string]string{} + for _, series := range paginatedTopStatefulSetGroupsSeries { + topStatefulSetGroups = append(topStatefulSetGroups, series.Labels) + } + allStatefulSetGroups := []map[string]string{} + for _, series := range formattedResponse[0].Series { + allStatefulSetGroups = append(allStatefulSetGroups, series.Labels) + } + + return topStatefulSetGroups, allStatefulSetGroups, nil +} + +func (d *StatefulSetsRepo) GetStatefulSetList(ctx context.Context, req model.StatefulSetListRequest) (model.StatefulSetListResponse, error) { + resp := model.StatefulSetListResponse{} + + if req.Limit == 0 { + req.Limit = 10 + } + + if req.OrderBy == nil { + req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc} + } + + if req.GroupBy == nil { + req.GroupBy = []v3.AttributeKey{{Key: k8sStatefulSetNameAttrKey}} + resp.Type = model.ResponseTypeList + } else { + resp.Type = model.ResponseTypeGroupedList + } + + step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60)) + + query := WorkloadTableListQuery.Clone() + + query.Start = req.Start + query.End = req.End + query.Step = step + + // add additional queries for stateful sets + for _, statefulSetQuery := range builderQueriesForStatefulSets { + query.CompositeQuery.BuilderQueries[statefulSetQuery.QueryName] = statefulSetQuery + } + + for _, query := range query.CompositeQuery.BuilderQueries { + query.StepInterval = step + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + query.GroupBy = req.GroupBy + // make sure we only get records for daemon sets + query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: k8sStatefulSetNameAttrKey}, + Operator: v3.FilterOperatorExists, + }) + } + + statefulSetAttrs, err := d.getMetadataAttributes(ctx, req) + if err != nil { + return resp, err + } + + topStatefulSetGroups, allStatefulSetGroups, err := d.getTopStatefulSetGroups(ctx, req, query) + if err != nil { + return resp, err + } + + groupFilters := map[string][]string{} + for _, topStatefulSetGroup := range topStatefulSetGroups { + for k, v := range topStatefulSetGroup { + groupFilters[k] = append(groupFilters[k], v) + } + } + + for groupKey, groupValues := range groupFilters { + hasGroupFilter := false + if req.Filters != nil && len(req.Filters.Items) > 0 { + for _, filter := range req.Filters.Items { + if filter.Key.Key == groupKey { + hasGroupFilter = true + break + } + } + } + + if !hasGroupFilter { + for _, query := range query.CompositeQuery.BuilderQueries { + query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: groupKey}, + Value: groupValues, + Operator: v3.FilterOperatorIn, + }) + } + } + } + + queryResponse, _, err := d.querierV2.QueryRange(ctx, query) + if err != nil { + return resp, err + } + + formattedResponse, err := postprocess.PostProcessResult(queryResponse, query) + if err != nil { + return resp, err + } + + records := []model.StatefulSetListRecord{} + + for _, result := range formattedResponse { + for _, row := range result.Table.Rows { + + record := model.StatefulSetListRecord{ + StatefulSetName: "", + CPUUsage: -1, + CPURequest: -1, + CPULimit: -1, + MemoryUsage: -1, + MemoryRequest: -1, + MemoryLimit: -1, + DesiredPods: -1, + AvailablePods: -1, + } + + if statefulSetName, ok := row.Data[k8sStatefulSetNameAttrKey].(string); ok { + record.StatefulSetName = statefulSetName + } + + if cpu, ok := row.Data["A"].(float64); ok { + record.CPUUsage = cpu + } + if cpuRequest, ok := row.Data["B"].(float64); ok { + record.CPURequest = cpuRequest + } + + if cpuLimit, ok := row.Data["C"].(float64); ok { + record.CPULimit = cpuLimit + } + + if memory, ok := row.Data["D"].(float64); ok { + record.MemoryUsage = memory + } + + if memoryRequest, ok := row.Data["E"].(float64); ok { + record.MemoryRequest = memoryRequest + } + + if memoryLimit, ok := row.Data["F"].(float64); ok { + record.MemoryLimit = memoryLimit + } + + if restarts, ok := row.Data["G"].(float64); ok { + record.Restarts = int(restarts) + } + + if desiredPods, ok := row.Data["H"].(float64); ok { + record.DesiredPods = int(desiredPods) + } + + if availablePods, ok := row.Data["I"].(float64); ok { + record.AvailablePods = int(availablePods) + } + + record.Meta = map[string]string{} + if _, ok := statefulSetAttrs[record.StatefulSetName]; ok { + record.Meta = statefulSetAttrs[record.StatefulSetName] + } + + for k, v := range row.Data { + if slices.Contains(statefulSetQueryNames, k) { + continue + } + if labelValue, ok := v.(string); ok { + record.Meta[k] = labelValue + } + } + + records = append(records, record) + } + } + resp.Total = len(allStatefulSetGroups) + resp.Records = records + + return resp, nil +} diff --git a/pkg/query-service/model/infra.go b/pkg/query-service/model/infra.go index 72999b22ae..0c23a0642b 100644 --- a/pkg/query-service/model/infra.go +++ b/pkg/query-service/model/infra.go @@ -233,3 +233,65 @@ type DaemonSetListRecord struct { AvailableNodes int `json:"availableNodes"` Meta map[string]string `json:"meta"` } + +type StatefulSetListRequest struct { + Start int64 `json:"start"` // epoch time in ms + End int64 `json:"end"` // epoch time in ms + Filters *v3.FilterSet `json:"filters"` + GroupBy []v3.AttributeKey `json:"groupBy"` + OrderBy *v3.OrderBy `json:"orderBy"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +type StatefulSetListResponse struct { + Type ResponseType `json:"type"` + Records []StatefulSetListRecord `json:"records"` + Total int `json:"total"` +} + +type StatefulSetListRecord struct { + StatefulSetName string `json:"statefulSetName"` + CPUUsage float64 `json:"cpuUsage"` + MemoryUsage float64 `json:"memoryUsage"` + CPURequest float64 `json:"cpuRequest"` + MemoryRequest float64 `json:"memoryRequest"` + CPULimit float64 `json:"cpuLimit"` + MemoryLimit float64 `json:"memoryLimit"` + Restarts int `json:"restarts"` + DesiredPods int `json:"desiredPods"` + AvailablePods int `json:"availablePods"` + Meta map[string]string `json:"meta"` +} + +type JobListRequest struct { + Start int64 `json:"start"` // epoch time in ms + End int64 `json:"end"` // epoch time in ms + Filters *v3.FilterSet `json:"filters"` + GroupBy []v3.AttributeKey `json:"groupBy"` + OrderBy *v3.OrderBy `json:"orderBy"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +type JobListResponse struct { + Type ResponseType `json:"type"` + Records []JobListRecord `json:"records"` + Total int `json:"total"` +} + +type JobListRecord struct { + JobName string `json:"jobName"` + CPUUsage float64 `json:"cpuUsage"` + MemoryUsage float64 `json:"memoryUsage"` + CPURequest float64 `json:"cpuRequest"` + MemoryRequest float64 `json:"memoryRequest"` + CPULimit float64 `json:"cpuLimit"` + MemoryLimit float64 `json:"memoryLimit"` + Restarts int `json:"restarts"` + DesiredSuccessfulPods int `json:"desiredSuccessfulPods"` + ActivePods int `json:"activePods"` + FailedPods int `json:"failedPods"` + SuccessfulPods int `json:"successfulPods"` + Meta map[string]string `json:"meta"` +}