From fd9e9f0fb33eaacdf62397ddea61f24ab58bb8a3 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 12 Nov 2024 20:53:40 +0530 Subject: [PATCH 1/3] chore: add k8s {deployment, daemonset, statefulset, job} resources (#6401) --- pkg/query-service/app/http_handler.go | 33 ++ pkg/query-service/app/infra.go | 210 ++++++++ pkg/query-service/app/inframetrics/common.go | 16 + .../app/inframetrics/daemonsets.go | 444 ++++++++++++++++ .../app/inframetrics/deployments.go | 444 ++++++++++++++++ pkg/query-service/app/inframetrics/jobs.go | 498 ++++++++++++++++++ .../app/inframetrics/statefulsets.go | 444 ++++++++++++++++ .../app/inframetrics/workload_query.go | 166 ++++++ pkg/query-service/model/infra.go | 122 +++++ 9 files changed, 2377 insertions(+) create mode 100644 pkg/query-service/app/inframetrics/daemonsets.go create mode 100644 pkg/query-service/app/inframetrics/deployments.go create mode 100644 pkg/query-service/app/inframetrics/jobs.go create mode 100644 pkg/query-service/app/inframetrics/statefulsets.go create mode 100644 pkg/query-service/app/inframetrics/workload_query.go diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index bbb42effbf..0f6d351af7 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -119,6 +119,11 @@ type APIHandler struct { nodesRepo *inframetrics.NodesRepo namespacesRepo *inframetrics.NamespacesRepo clustersRepo *inframetrics.ClustersRepo + // workloads + deploymentsRepo *inframetrics.DeploymentsRepo + daemonsetsRepo *inframetrics.DaemonSetsRepo + statefulsetsRepo *inframetrics.StatefulSetsRepo + jobsRepo *inframetrics.JobsRepo } type APIHandlerOpts struct { @@ -197,6 +202,10 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { nodesRepo := inframetrics.NewNodesRepo(opts.Reader, querierv2) namespacesRepo := inframetrics.NewNamespacesRepo(opts.Reader, querierv2) clustersRepo := inframetrics.NewClustersRepo(opts.Reader, querierv2) + deploymentsRepo := inframetrics.NewDeploymentsRepo(opts.Reader, querierv2) + daemonsetsRepo := inframetrics.NewDaemonSetsRepo(opts.Reader, querierv2) + statefulsetsRepo := inframetrics.NewStatefulSetsRepo(opts.Reader, querierv2) + jobsRepo := inframetrics.NewJobsRepo(opts.Reader, querierv2) aH := &APIHandler{ reader: opts.Reader, @@ -222,6 +231,10 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { nodesRepo: nodesRepo, namespacesRepo: namespacesRepo, clustersRepo: clustersRepo, + deploymentsRepo: deploymentsRepo, + daemonsetsRepo: daemonsetsRepo, + statefulsetsRepo: statefulsetsRepo, + jobsRepo: jobsRepo, } logsQueryBuilder := logsv3.PrepareLogsQuery @@ -400,6 +413,26 @@ func (aH *APIHandler) RegisterInfraMetricsRoutes(router *mux.Router, am *AuthMid clustersSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getClusterAttributeKeys)).Methods(http.MethodGet) clustersSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getClusterAttributeValues)).Methods(http.MethodGet) clustersSubRouter.HandleFunc("/list", am.ViewAccess(aH.getClusterList)).Methods(http.MethodPost) + + deploymentsSubRouter := router.PathPrefix("/api/v1/deployments").Subrouter() + deploymentsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getDeploymentAttributeKeys)).Methods(http.MethodGet) + deploymentsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getDeploymentAttributeValues)).Methods(http.MethodGet) + deploymentsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getDeploymentList)).Methods(http.MethodPost) + + daemonsetsSubRouter := router.PathPrefix("/api/v1/daemonsets").Subrouter() + daemonsetsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getDaemonSetAttributeKeys)).Methods(http.MethodGet) + daemonsetsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getDaemonSetAttributeValues)).Methods(http.MethodGet) + daemonsetsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getDaemonSetList)).Methods(http.MethodPost) + + statefulsetsSubRouter := router.PathPrefix("/api/v1/statefulsets").Subrouter() + statefulsetsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getStatefulSetAttributeKeys)).Methods(http.MethodGet) + statefulsetsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getStatefulSetAttributeValues)).Methods(http.MethodGet) + statefulsetsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getStatefulSetList)).Methods(http.MethodPost) + + jobsSubRouter := router.PathPrefix("/api/v1/jobs").Subrouter() + jobsSubRouter.HandleFunc("/attribute_keys", am.ViewAccess(aH.getJobAttributeKeys)).Methods(http.MethodGet) + jobsSubRouter.HandleFunc("/attribute_values", am.ViewAccess(aH.getJobAttributeValues)).Methods(http.MethodGet) + jobsSubRouter.HandleFunc("/list", am.ViewAccess(aH.getJobList)).Methods(http.MethodPost) } func (aH *APIHandler) RegisterWebSocketPaths(router *mux.Router, am *AuthMiddleware) { diff --git a/pkg/query-service/app/infra.go b/pkg/query-service/app/infra.go index 73d10bdddb..b1f741e244 100644 --- a/pkg/query-service/app/infra.go +++ b/pkg/query-service/app/infra.go @@ -334,3 +334,213 @@ func (aH *APIHandler) getClusterList(w http.ResponseWriter, r *http.Request) { aH.Respond(w, clusterList) } + +func (aH *APIHandler) getDeploymentAttributeKeys(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeKeyRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + keys, err := aH.deploymentsRepo.GetDeploymentAttributeKeys(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, keys) +} + +func (aH *APIHandler) getDeploymentAttributeValues(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeValueRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + values, err := aH.deploymentsRepo.GetDeploymentAttributeValues(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, values) +} + +func (aH *APIHandler) getDeploymentList(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req := model.DeploymentListRequest{} + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + deploymentList, err := aH.deploymentsRepo.GetDeploymentList(ctx, req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, deploymentList) +} + +func (aH *APIHandler) getDaemonSetAttributeKeys(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeKeyRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + keys, err := aH.daemonsetsRepo.GetDaemonSetAttributeKeys(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, keys) +} + +func (aH *APIHandler) getDaemonSetAttributeValues(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeValueRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + values, err := aH.daemonsetsRepo.GetDaemonSetAttributeValues(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, values) +} + +func (aH *APIHandler) getDaemonSetList(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req := model.DaemonSetListRequest{} + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + daemonSetList, err := aH.daemonsetsRepo.GetDaemonSetList(ctx, req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, daemonSetList) +} + +func (aH *APIHandler) getStatefulSetAttributeKeys(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeKeyRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + keys, err := aH.statefulsetsRepo.GetStatefulSetAttributeKeys(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, keys) +} + +func (aH *APIHandler) getStatefulSetAttributeValues(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeValueRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + values, err := aH.statefulsetsRepo.GetStatefulSetAttributeValues(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, values) +} + +func (aH *APIHandler) getStatefulSetList(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req := model.StatefulSetListRequest{} + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + statefulSetList, err := aH.statefulsetsRepo.GetStatefulSetList(ctx, req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, statefulSetList) +} + +func (aH *APIHandler) getJobAttributeKeys(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeKeyRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + keys, err := aH.jobsRepo.GetJobAttributeKeys(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + aH.Respond(w, keys) +} + +func (aH *APIHandler) getJobAttributeValues(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req, err := parseFilterAttributeValueRequest(r) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + values, err := aH.jobsRepo.GetJobAttributeValues(ctx, *req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + aH.Respond(w, values) +} + +func (aH *APIHandler) getJobList(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + req := model.JobListRequest{} + + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + jobList, err := aH.jobsRepo.GetJobList(ctx, req) + if err != nil { + RespondError(w, &model.ApiError{Typ: model.ErrorInternal, Err: err}, nil) + return + } + + aH.Respond(w, jobList) +} diff --git a/pkg/query-service/app/inframetrics/common.go b/pkg/query-service/app/inframetrics/common.go index 7cde41185e..c4c280cb98 100644 --- a/pkg/query-service/app/inframetrics/common.go +++ b/pkg/query-service/app/inframetrics/common.go @@ -73,6 +73,22 @@ func getParamsForTopClusters(req model.ClusterListRequest) (int64, string, strin return getParamsForTopItems(req.Start, req.End) } +func getParamsForTopDeployments(req model.DeploymentListRequest) (int64, string, string) { + return getParamsForTopItems(req.Start, req.End) +} + +func getParamsForTopDaemonSets(req model.DaemonSetListRequest) (int64, string, string) { + return getParamsForTopItems(req.Start, req.End) +} + +func getParamsForTopStatefulSets(req model.StatefulSetListRequest) (int64, string, string) { + return getParamsForTopItems(req.Start, req.End) +} + +func getParamsForTopJobs(req model.JobListRequest) (int64, string, string) { + return getParamsForTopItems(req.Start, req.End) +} + // TODO(srikanthccv): remove this // What is happening here? // The `PrepareTimeseriesFilterQuery` uses the local time series table for sub-query because each fingerprint diff --git a/pkg/query-service/app/inframetrics/daemonsets.go b/pkg/query-service/app/inframetrics/daemonsets.go new file mode 100644 index 0000000000..735d52d2a0 --- /dev/null +++ b/pkg/query-service/app/inframetrics/daemonsets.go @@ -0,0 +1,444 @@ +package inframetrics + +import ( + "context" + "math" + "sort" + + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" + "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/postprocess" + "golang.org/x/exp/slices" +) + +var ( + metricToUseForDaemonSets = "k8s_pod_cpu_utilization" + k8sDaemonSetNameAttrKey = "k8s_daemonset_name" + + metricNamesForDaemonSets = map[string]string{ + "desired_nodes": "k8s_daemonset_desired_scheduled_nodes", + "available_nodes": "k8s_daemonset_current_scheduled_nodes", + } + + daemonSetAttrsToEnrich = []string{ + "k8s_daemonset_name", + "k8s_namespace_name", + "k8s_cluster_name", + } + + queryNamesForDaemonSets = map[string][]string{ + "cpu": {"A"}, + "cpu_request": {"B", "A"}, + "cpu_limit": {"C", "A"}, + "memory": {"D"}, + "memory_request": {"E", "D"}, + "memory_limit": {"F", "D"}, + "restarts": {"G", "A"}, + "desired_nodes": {"H"}, + "available_nodes": {"I"}, + } + + builderQueriesForDaemonSets = map[string]*v3.BuilderQuery{ + // desired nodes + "H": { + QueryName: "H", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForDaemonSets["desired_nodes"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "H", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // available nodes + "I": { + QueryName: "I", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForDaemonSets["available_nodes"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "I", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + } + + daemonSetQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"} +) + +type DaemonSetsRepo struct { + reader interfaces.Reader + querierV2 interfaces.Querier +} + +func NewDaemonSetsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *DaemonSetsRepo { + return &DaemonSetsRepo{reader: reader, querierV2: querierV2} +} + +func (d *DaemonSetsRepo) GetDaemonSetAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + // TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = metricToUseForDaemonSets + if req.Limit == 0 { + req.Limit = 50 + } + + attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req) + if err != nil { + return nil, err + } + + // TODO(srikanthccv): only return resource attributes when we have a way to + // distinguish between resource attributes and other attributes. + filteredKeys := []v3.AttributeKey{} + for _, key := range attributeKeysResponse.AttributeKeys { + if slices.Contains(pointAttrsToIgnore, key.Key) { + continue + } + filteredKeys = append(filteredKeys, key) + } + + return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil +} + +func (d *DaemonSetsRepo) GetDaemonSetAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = metricToUseForDaemonSets + if req.Limit == 0 { + req.Limit = 50 + } + + attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req) + if err != nil { + return nil, err + } + + return attributeValuesResponse, nil +} + +func (d *DaemonSetsRepo) getMetadataAttributes(ctx context.Context, req model.DaemonSetListRequest) (map[string]map[string]string, error) { + daemonSetAttrs := map[string]map[string]string{} + + for _, key := range daemonSetAttrsToEnrich { + hasKey := false + for _, groupByKey := range req.GroupBy { + if groupByKey.Key == key { + hasKey = true + break + } + } + if !hasKey { + req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key}) + } + } + + mq := v3.BuilderQuery{ + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricToUseForDaemonSets, + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + GroupBy: req.GroupBy, + } + + query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq) + if err != nil { + return nil, err + } + + query = localQueryToDistributedQuery(query) + + attrsListResponse, err := d.reader.GetListResultV3(ctx, query) + if err != nil { + return nil, err + } + + for _, row := range attrsListResponse { + stringData := map[string]string{} + for key, value := range row.Data { + if str, ok := value.(string); ok { + stringData[key] = str + } else if strPtr, ok := value.(*string); ok { + stringData[key] = *strPtr + } + } + + daemonSetName := stringData[k8sDaemonSetNameAttrKey] + if _, ok := daemonSetAttrs[daemonSetName]; !ok { + daemonSetAttrs[daemonSetName] = map[string]string{} + } + + for _, key := range req.GroupBy { + daemonSetAttrs[daemonSetName][key.Key] = stringData[key.Key] + } + } + + return daemonSetAttrs, nil +} + +func (d *DaemonSetsRepo) getTopDaemonSetGroups(ctx context.Context, req model.DaemonSetListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) { + step, timeSeriesTableName, samplesTableName := getParamsForTopDaemonSets(req) + + queryNames := queryNamesForDaemonSets[req.OrderBy.ColumnName] + topDaemonSetGroupsQueryRangeParams := &v3.QueryRangeParamsV3{ + Start: req.Start, + End: req.End, + Step: step, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{}, + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeTable, + }, + } + + for _, queryName := range queryNames { + query := q.CompositeQuery.BuilderQueries[queryName].Clone() + query.StepInterval = step + query.MetricTableHints = &v3.MetricTableHints{ + TimeSeriesTableName: timeSeriesTableName, + SamplesTableName: samplesTableName, + } + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + topDaemonSetGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query + } + + queryResponse, _, err := d.querierV2.QueryRange(ctx, topDaemonSetGroupsQueryRangeParams) + if err != nil { + return nil, nil, err + } + formattedResponse, err := postprocess.PostProcessResult(queryResponse, topDaemonSetGroupsQueryRangeParams) + if err != nil { + return nil, nil, err + } + + if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 { + return nil, nil, nil + } + + if req.OrderBy.Order == v3.DirectionDesc { + sort.Slice(formattedResponse[0].Series, func(i, j int) bool { + return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value + }) + } else { + sort.Slice(formattedResponse[0].Series, func(i, j int) bool { + return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value + }) + } + + limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series))) + + paginatedTopDaemonSetGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)] + + topDaemonSetGroups := []map[string]string{} + for _, series := range paginatedTopDaemonSetGroupsSeries { + topDaemonSetGroups = append(topDaemonSetGroups, series.Labels) + } + allDaemonSetGroups := []map[string]string{} + for _, series := range formattedResponse[0].Series { + allDaemonSetGroups = append(allDaemonSetGroups, series.Labels) + } + + return topDaemonSetGroups, allDaemonSetGroups, nil +} + +func (d *DaemonSetsRepo) GetDaemonSetList(ctx context.Context, req model.DaemonSetListRequest) (model.DaemonSetListResponse, error) { + resp := model.DaemonSetListResponse{} + + if req.Limit == 0 { + req.Limit = 10 + } + + if req.OrderBy == nil { + req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc} + } + + if req.GroupBy == nil { + req.GroupBy = []v3.AttributeKey{{Key: k8sDaemonSetNameAttrKey}} + resp.Type = model.ResponseTypeList + } else { + resp.Type = model.ResponseTypeGroupedList + } + + step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60)) + + query := WorkloadTableListQuery.Clone() + + query.Start = req.Start + query.End = req.End + query.Step = step + + // add additional queries for daemon sets + for _, daemonSetQuery := range builderQueriesForDaemonSets { + query.CompositeQuery.BuilderQueries[daemonSetQuery.QueryName] = daemonSetQuery + } + + for _, query := range query.CompositeQuery.BuilderQueries { + query.StepInterval = step + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + query.GroupBy = req.GroupBy + // make sure we only get records for daemon sets + query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: k8sDaemonSetNameAttrKey}, + Operator: v3.FilterOperatorExists, + }) + } + + daemonSetAttrs, err := d.getMetadataAttributes(ctx, req) + if err != nil { + return resp, err + } + + topDaemonSetGroups, allDaemonSetGroups, err := d.getTopDaemonSetGroups(ctx, req, query) + if err != nil { + return resp, err + } + + groupFilters := map[string][]string{} + for _, topDaemonSetGroup := range topDaemonSetGroups { + for k, v := range topDaemonSetGroup { + groupFilters[k] = append(groupFilters[k], v) + } + } + + for groupKey, groupValues := range groupFilters { + hasGroupFilter := false + if req.Filters != nil && len(req.Filters.Items) > 0 { + for _, filter := range req.Filters.Items { + if filter.Key.Key == groupKey { + hasGroupFilter = true + break + } + } + } + + if !hasGroupFilter { + for _, query := range query.CompositeQuery.BuilderQueries { + query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: groupKey}, + Value: groupValues, + Operator: v3.FilterOperatorIn, + }) + } + } + } + + queryResponse, _, err := d.querierV2.QueryRange(ctx, query) + if err != nil { + return resp, err + } + + formattedResponse, err := postprocess.PostProcessResult(queryResponse, query) + if err != nil { + return resp, err + } + + records := []model.DaemonSetListRecord{} + + for _, result := range formattedResponse { + for _, row := range result.Table.Rows { + + record := model.DaemonSetListRecord{ + DaemonSetName: "", + CPUUsage: -1, + CPURequest: -1, + CPULimit: -1, + MemoryUsage: -1, + MemoryRequest: -1, + MemoryLimit: -1, + DesiredNodes: -1, + AvailableNodes: -1, + } + + if daemonSetName, ok := row.Data[k8sDaemonSetNameAttrKey].(string); ok { + record.DaemonSetName = daemonSetName + } + + if cpu, ok := row.Data["A"].(float64); ok { + record.CPUUsage = cpu + } + if cpuRequest, ok := row.Data["B"].(float64); ok { + record.CPURequest = cpuRequest + } + + if cpuLimit, ok := row.Data["C"].(float64); ok { + record.CPULimit = cpuLimit + } + + if memory, ok := row.Data["D"].(float64); ok { + record.MemoryUsage = memory + } + + if memoryRequest, ok := row.Data["E"].(float64); ok { + record.MemoryRequest = memoryRequest + } + + if memoryLimit, ok := row.Data["F"].(float64); ok { + record.MemoryLimit = memoryLimit + } + + if restarts, ok := row.Data["G"].(float64); ok { + record.Restarts = int(restarts) + } + + if desiredNodes, ok := row.Data["H"].(float64); ok { + record.DesiredNodes = int(desiredNodes) + } + + if availableNodes, ok := row.Data["I"].(float64); ok { + record.AvailableNodes = int(availableNodes) + } + + record.Meta = map[string]string{} + if _, ok := daemonSetAttrs[record.DaemonSetName]; ok { + record.Meta = daemonSetAttrs[record.DaemonSetName] + } + + for k, v := range row.Data { + if slices.Contains(daemonSetQueryNames, k) { + continue + } + if labelValue, ok := v.(string); ok { + record.Meta[k] = labelValue + } + } + + records = append(records, record) + } + } + resp.Total = len(allDaemonSetGroups) + resp.Records = records + + return resp, nil +} diff --git a/pkg/query-service/app/inframetrics/deployments.go b/pkg/query-service/app/inframetrics/deployments.go new file mode 100644 index 0000000000..aed8de1929 --- /dev/null +++ b/pkg/query-service/app/inframetrics/deployments.go @@ -0,0 +1,444 @@ +package inframetrics + +import ( + "context" + "math" + "sort" + + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" + "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/postprocess" + "golang.org/x/exp/slices" +) + +var ( + metricToUseForDeployments = "k8s_pod_cpu_utilization" + k8sDeploymentNameAttrKey = "k8s_deployment_name" + + metricNamesForDeployments = map[string]string{ + "desired_pods": "k8s_deployment_desired", + "available_pods": "k8s_deployment_available", + } + + deploymentAttrsToEnrich = []string{ + "k8s_deployment_name", + "k8s_namespace_name", + "k8s_cluster_name", + } + + queryNamesForDeployments = map[string][]string{ + "cpu": {"A"}, + "cpu_request": {"B", "A"}, + "cpu_limit": {"C", "A"}, + "memory": {"D"}, + "memory_request": {"E", "D"}, + "memory_limit": {"F", "D"}, + "restarts": {"G", "A"}, + "desired_pods": {"H"}, + "available_pods": {"I"}, + } + + builderQueriesForDeployments = map[string]*v3.BuilderQuery{ + // desired pods + "H": { + QueryName: "H", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForDeployments["desired_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "H", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // available pods + "I": { + QueryName: "I", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForDeployments["available_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "I", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + } + + deploymentQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"} +) + +type DeploymentsRepo struct { + reader interfaces.Reader + querierV2 interfaces.Querier +} + +func NewDeploymentsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *DeploymentsRepo { + return &DeploymentsRepo{reader: reader, querierV2: querierV2} +} + +func (d *DeploymentsRepo) GetDeploymentAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + // TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = metricToUseForDeployments + if req.Limit == 0 { + req.Limit = 50 + } + + attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req) + if err != nil { + return nil, err + } + + // TODO(srikanthccv): only return resource attributes when we have a way to + // distinguish between resource attributes and other attributes. + filteredKeys := []v3.AttributeKey{} + for _, key := range attributeKeysResponse.AttributeKeys { + if slices.Contains(pointAttrsToIgnore, key.Key) { + continue + } + filteredKeys = append(filteredKeys, key) + } + + return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil +} + +func (d *DeploymentsRepo) GetDeploymentAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = metricToUseForDeployments + if req.Limit == 0 { + req.Limit = 50 + } + + attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req) + if err != nil { + return nil, err + } + + return attributeValuesResponse, nil +} + +func (d *DeploymentsRepo) getMetadataAttributes(ctx context.Context, req model.DeploymentListRequest) (map[string]map[string]string, error) { + deploymentAttrs := map[string]map[string]string{} + + for _, key := range deploymentAttrsToEnrich { + hasKey := false + for _, groupByKey := range req.GroupBy { + if groupByKey.Key == key { + hasKey = true + break + } + } + if !hasKey { + req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key}) + } + } + + mq := v3.BuilderQuery{ + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricToUseForDeployments, + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + GroupBy: req.GroupBy, + } + + query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq) + if err != nil { + return nil, err + } + + query = localQueryToDistributedQuery(query) + + attrsListResponse, err := d.reader.GetListResultV3(ctx, query) + if err != nil { + return nil, err + } + + for _, row := range attrsListResponse { + stringData := map[string]string{} + for key, value := range row.Data { + if str, ok := value.(string); ok { + stringData[key] = str + } else if strPtr, ok := value.(*string); ok { + stringData[key] = *strPtr + } + } + + deploymentName := stringData[k8sDeploymentNameAttrKey] + if _, ok := deploymentAttrs[deploymentName]; !ok { + deploymentAttrs[deploymentName] = map[string]string{} + } + + for _, key := range req.GroupBy { + deploymentAttrs[deploymentName][key.Key] = stringData[key.Key] + } + } + + return deploymentAttrs, nil +} + +func (d *DeploymentsRepo) getTopDeploymentGroups(ctx context.Context, req model.DeploymentListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) { + step, timeSeriesTableName, samplesTableName := getParamsForTopDeployments(req) + + queryNames := queryNamesForDeployments[req.OrderBy.ColumnName] + topDeploymentGroupsQueryRangeParams := &v3.QueryRangeParamsV3{ + Start: req.Start, + End: req.End, + Step: step, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{}, + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeTable, + }, + } + + for _, queryName := range queryNames { + query := q.CompositeQuery.BuilderQueries[queryName].Clone() + query.StepInterval = step + query.MetricTableHints = &v3.MetricTableHints{ + TimeSeriesTableName: timeSeriesTableName, + SamplesTableName: samplesTableName, + } + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + topDeploymentGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query + } + + queryResponse, _, err := d.querierV2.QueryRange(ctx, topDeploymentGroupsQueryRangeParams) + if err != nil { + return nil, nil, err + } + formattedResponse, err := postprocess.PostProcessResult(queryResponse, topDeploymentGroupsQueryRangeParams) + if err != nil { + return nil, nil, err + } + + if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 { + return nil, nil, nil + } + + if req.OrderBy.Order == v3.DirectionDesc { + sort.Slice(formattedResponse[0].Series, func(i, j int) bool { + return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value + }) + } else { + sort.Slice(formattedResponse[0].Series, func(i, j int) bool { + return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value + }) + } + + limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series))) + + paginatedTopDeploymentGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)] + + topDeploymentGroups := []map[string]string{} + for _, series := range paginatedTopDeploymentGroupsSeries { + topDeploymentGroups = append(topDeploymentGroups, series.Labels) + } + allDeploymentGroups := []map[string]string{} + for _, series := range formattedResponse[0].Series { + allDeploymentGroups = append(allDeploymentGroups, series.Labels) + } + + return topDeploymentGroups, allDeploymentGroups, nil +} + +func (d *DeploymentsRepo) GetDeploymentList(ctx context.Context, req model.DeploymentListRequest) (model.DeploymentListResponse, error) { + resp := model.DeploymentListResponse{} + + if req.Limit == 0 { + req.Limit = 10 + } + + if req.OrderBy == nil { + req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc} + } + + if req.GroupBy == nil { + req.GroupBy = []v3.AttributeKey{{Key: k8sDeploymentNameAttrKey}} + resp.Type = model.ResponseTypeList + } else { + resp.Type = model.ResponseTypeGroupedList + } + + step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60)) + + query := WorkloadTableListQuery.Clone() + + query.Start = req.Start + query.End = req.End + query.Step = step + + // add additional queries for deployments + for _, deploymentQuery := range builderQueriesForDeployments { + query.CompositeQuery.BuilderQueries[deploymentQuery.QueryName] = deploymentQuery + } + + for _, query := range query.CompositeQuery.BuilderQueries { + query.StepInterval = step + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + query.GroupBy = req.GroupBy + // make sure we only get records for deployments + query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: k8sDeploymentNameAttrKey}, + Operator: v3.FilterOperatorExists, + }) + } + + deploymentAttrs, err := d.getMetadataAttributes(ctx, req) + if err != nil { + return resp, err + } + + topDeploymentGroups, allDeploymentGroups, err := d.getTopDeploymentGroups(ctx, req, query) + if err != nil { + return resp, err + } + + groupFilters := map[string][]string{} + for _, topDeploymentGroup := range topDeploymentGroups { + for k, v := range topDeploymentGroup { + groupFilters[k] = append(groupFilters[k], v) + } + } + + for groupKey, groupValues := range groupFilters { + hasGroupFilter := false + if req.Filters != nil && len(req.Filters.Items) > 0 { + for _, filter := range req.Filters.Items { + if filter.Key.Key == groupKey { + hasGroupFilter = true + break + } + } + } + + if !hasGroupFilter { + for _, query := range query.CompositeQuery.BuilderQueries { + query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: groupKey}, + Value: groupValues, + Operator: v3.FilterOperatorIn, + }) + } + } + } + + queryResponse, _, err := d.querierV2.QueryRange(ctx, query) + if err != nil { + return resp, err + } + + formattedResponse, err := postprocess.PostProcessResult(queryResponse, query) + if err != nil { + return resp, err + } + + records := []model.DeploymentListRecord{} + + for _, result := range formattedResponse { + for _, row := range result.Table.Rows { + + record := model.DeploymentListRecord{ + DeploymentName: "", + CPUUsage: -1, + CPURequest: -1, + CPULimit: -1, + MemoryUsage: -1, + MemoryRequest: -1, + MemoryLimit: -1, + DesiredPods: -1, + AvailablePods: -1, + } + + if deploymentName, ok := row.Data[k8sDeploymentNameAttrKey].(string); ok { + record.DeploymentName = deploymentName + } + + if cpu, ok := row.Data["A"].(float64); ok { + record.CPUUsage = cpu + } + if cpuRequest, ok := row.Data["B"].(float64); ok { + record.CPURequest = cpuRequest + } + + if cpuLimit, ok := row.Data["C"].(float64); ok { + record.CPULimit = cpuLimit + } + + if memory, ok := row.Data["D"].(float64); ok { + record.MemoryUsage = memory + } + + if memoryRequest, ok := row.Data["E"].(float64); ok { + record.MemoryRequest = memoryRequest + } + + if memoryLimit, ok := row.Data["F"].(float64); ok { + record.MemoryLimit = memoryLimit + } + + if restarts, ok := row.Data["G"].(float64); ok { + record.Restarts = int(restarts) + } + + if desiredPods, ok := row.Data["H"].(float64); ok { + record.DesiredPods = int(desiredPods) + } + + if availablePods, ok := row.Data["I"].(float64); ok { + record.AvailablePods = int(availablePods) + } + + record.Meta = map[string]string{} + if _, ok := deploymentAttrs[record.DeploymentName]; ok { + record.Meta = deploymentAttrs[record.DeploymentName] + } + + for k, v := range row.Data { + if slices.Contains(deploymentQueryNames, k) { + continue + } + if labelValue, ok := v.(string); ok { + record.Meta[k] = labelValue + } + } + + records = append(records, record) + } + } + resp.Total = len(allDeploymentGroups) + resp.Records = records + + return resp, nil +} diff --git a/pkg/query-service/app/inframetrics/jobs.go b/pkg/query-service/app/inframetrics/jobs.go new file mode 100644 index 0000000000..42300f0b87 --- /dev/null +++ b/pkg/query-service/app/inframetrics/jobs.go @@ -0,0 +1,498 @@ +package inframetrics + +import ( + "context" + "math" + "sort" + + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" + "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/postprocess" + "golang.org/x/exp/slices" +) + +var ( + metricToUseForJobs = "k8s_pod_cpu_utilization" + k8sJobNameAttrKey = "k8s_job_name" + + metricNamesForJobs = map[string]string{ + "desired_successful_pods": "k8s_job_desired_successful_pods", + "active_pods": "k8s_job_active_pods", + "failed_pods": "k8s_job_failed_pods", + "successful_pods": "k8s_job_successful_pods", + } + + jobAttrsToEnrich = []string{ + "k8s_job_name", + "k8s_namespace_name", + "k8s_cluster_name", + } + + queryNamesForJobs = map[string][]string{ + "cpu": {"A"}, + "cpu_request": {"B", "A"}, + "cpu_limit": {"C", "A"}, + "memory": {"D"}, + "memory_request": {"E", "D"}, + "memory_limit": {"F", "D"}, + "restarts": {"G", "A"}, + "desired_pods": {"H"}, + "active_pods": {"I"}, + "failed_pods": {"J"}, + "successful_pods": {"K"}, + } + + builderQueriesForJobs = map[string]*v3.BuilderQuery{ + // desired nodes + "H": { + QueryName: "H", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForJobs["desired_successful_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "H", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // available nodes + "I": { + QueryName: "I", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForJobs["active_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "I", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // failed pods + "J": { + QueryName: "J", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForJobs["failed_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "J", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // successful pods + "K": { + QueryName: "K", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForJobs["successful_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "K", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + } + + jobQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "K"} +) + +type JobsRepo struct { + reader interfaces.Reader + querierV2 interfaces.Querier +} + +func NewJobsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *JobsRepo { + return &JobsRepo{reader: reader, querierV2: querierV2} +} + +func (d *JobsRepo) GetJobAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + // TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = metricToUseForJobs + if req.Limit == 0 { + req.Limit = 50 + } + + attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req) + if err != nil { + return nil, err + } + + // TODO(srikanthccv): only return resource attributes when we have a way to + // distinguish between resource attributes and other attributes. + filteredKeys := []v3.AttributeKey{} + for _, key := range attributeKeysResponse.AttributeKeys { + if slices.Contains(pointAttrsToIgnore, key.Key) { + continue + } + filteredKeys = append(filteredKeys, key) + } + + return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil +} + +func (d *JobsRepo) GetJobAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = metricToUseForJobs + if req.Limit == 0 { + req.Limit = 50 + } + + attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req) + if err != nil { + return nil, err + } + + return attributeValuesResponse, nil +} + +func (d *JobsRepo) getMetadataAttributes(ctx context.Context, req model.JobListRequest) (map[string]map[string]string, error) { + jobAttrs := map[string]map[string]string{} + + for _, key := range jobAttrsToEnrich { + hasKey := false + for _, groupByKey := range req.GroupBy { + if groupByKey.Key == key { + hasKey = true + break + } + } + if !hasKey { + req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key}) + } + } + + mq := v3.BuilderQuery{ + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricToUseForJobs, + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + GroupBy: req.GroupBy, + } + + query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq) + if err != nil { + return nil, err + } + + query = localQueryToDistributedQuery(query) + + attrsListResponse, err := d.reader.GetListResultV3(ctx, query) + if err != nil { + return nil, err + } + + for _, row := range attrsListResponse { + stringData := map[string]string{} + for key, value := range row.Data { + if str, ok := value.(string); ok { + stringData[key] = str + } else if strPtr, ok := value.(*string); ok { + stringData[key] = *strPtr + } + } + + jobName := stringData[k8sJobNameAttrKey] + if _, ok := jobAttrs[jobName]; !ok { + jobAttrs[jobName] = map[string]string{} + } + + for _, key := range req.GroupBy { + jobAttrs[jobName][key.Key] = stringData[key.Key] + } + } + + return jobAttrs, nil +} + +func (d *JobsRepo) getTopJobGroups(ctx context.Context, req model.JobListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) { + step, timeSeriesTableName, samplesTableName := getParamsForTopJobs(req) + + queryNames := queryNamesForJobs[req.OrderBy.ColumnName] + topJobGroupsQueryRangeParams := &v3.QueryRangeParamsV3{ + Start: req.Start, + End: req.End, + Step: step, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{}, + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeTable, + }, + } + + for _, queryName := range queryNames { + query := q.CompositeQuery.BuilderQueries[queryName].Clone() + query.StepInterval = step + query.MetricTableHints = &v3.MetricTableHints{ + TimeSeriesTableName: timeSeriesTableName, + SamplesTableName: samplesTableName, + } + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + topJobGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query + } + + queryResponse, _, err := d.querierV2.QueryRange(ctx, topJobGroupsQueryRangeParams) + if err != nil { + return nil, nil, err + } + formattedResponse, err := postprocess.PostProcessResult(queryResponse, topJobGroupsQueryRangeParams) + if err != nil { + return nil, nil, err + } + + if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 { + return nil, nil, nil + } + + if req.OrderBy.Order == v3.DirectionDesc { + sort.Slice(formattedResponse[0].Series, func(i, j int) bool { + return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value + }) + } else { + sort.Slice(formattedResponse[0].Series, func(i, j int) bool { + return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value + }) + } + + limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series))) + + paginatedTopJobGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)] + + topJobGroups := []map[string]string{} + for _, series := range paginatedTopJobGroupsSeries { + topJobGroups = append(topJobGroups, series.Labels) + } + allJobGroups := []map[string]string{} + for _, series := range formattedResponse[0].Series { + allJobGroups = append(allJobGroups, series.Labels) + } + + return topJobGroups, allJobGroups, nil +} + +func (d *JobsRepo) GetJobList(ctx context.Context, req model.JobListRequest) (model.JobListResponse, error) { + resp := model.JobListResponse{} + + if req.Limit == 0 { + req.Limit = 10 + } + + if req.OrderBy == nil { + req.OrderBy = &v3.OrderBy{ColumnName: "desired_pods", Order: v3.DirectionDesc} + } + + if req.GroupBy == nil { + req.GroupBy = []v3.AttributeKey{{Key: k8sJobNameAttrKey}} + resp.Type = model.ResponseTypeList + } else { + resp.Type = model.ResponseTypeGroupedList + } + + step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60)) + + query := WorkloadTableListQuery.Clone() + + query.Start = req.Start + query.End = req.End + query.Step = step + + // add additional queries for jobs + for _, jobQuery := range builderQueriesForJobs { + query.CompositeQuery.BuilderQueries[jobQuery.QueryName] = jobQuery + } + + for _, query := range query.CompositeQuery.BuilderQueries { + query.StepInterval = step + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + query.GroupBy = req.GroupBy + // make sure we only get records for jobs + query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: k8sJobNameAttrKey}, + Operator: v3.FilterOperatorExists, + }) + } + + jobAttrs, err := d.getMetadataAttributes(ctx, req) + if err != nil { + return resp, err + } + + topJobGroups, allJobGroups, err := d.getTopJobGroups(ctx, req, query) + if err != nil { + return resp, err + } + + groupFilters := map[string][]string{} + for _, topJobGroup := range topJobGroups { + for k, v := range topJobGroup { + groupFilters[k] = append(groupFilters[k], v) + } + } + + for groupKey, groupValues := range groupFilters { + hasGroupFilter := false + if req.Filters != nil && len(req.Filters.Items) > 0 { + for _, filter := range req.Filters.Items { + if filter.Key.Key == groupKey { + hasGroupFilter = true + break + } + } + } + + if !hasGroupFilter { + for _, query := range query.CompositeQuery.BuilderQueries { + query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: groupKey}, + Value: groupValues, + Operator: v3.FilterOperatorIn, + }) + } + } + } + + queryResponse, _, err := d.querierV2.QueryRange(ctx, query) + if err != nil { + return resp, err + } + + formattedResponse, err := postprocess.PostProcessResult(queryResponse, query) + if err != nil { + return resp, err + } + + records := []model.JobListRecord{} + + for _, result := range formattedResponse { + for _, row := range result.Table.Rows { + + record := model.JobListRecord{ + JobName: "", + CPUUsage: -1, + CPURequest: -1, + CPULimit: -1, + MemoryUsage: -1, + MemoryRequest: -1, + MemoryLimit: -1, + DesiredSuccessfulPods: -1, + ActivePods: -1, + FailedPods: -1, + SuccessfulPods: -1, + } + + if jobName, ok := row.Data[k8sJobNameAttrKey].(string); ok { + record.JobName = jobName + } + + if cpu, ok := row.Data["A"].(float64); ok { + record.CPUUsage = cpu + } + if cpuRequest, ok := row.Data["B"].(float64); ok { + record.CPURequest = cpuRequest + } + + if cpuLimit, ok := row.Data["C"].(float64); ok { + record.CPULimit = cpuLimit + } + + if memory, ok := row.Data["D"].(float64); ok { + record.MemoryUsage = memory + } + + if memoryRequest, ok := row.Data["E"].(float64); ok { + record.MemoryRequest = memoryRequest + } + + if memoryLimit, ok := row.Data["F"].(float64); ok { + record.MemoryLimit = memoryLimit + } + + if restarts, ok := row.Data["G"].(float64); ok { + record.Restarts = int(restarts) + } + + if desiredSuccessfulPods, ok := row.Data["H"].(float64); ok { + record.DesiredSuccessfulPods = int(desiredSuccessfulPods) + } + + if activePods, ok := row.Data["I"].(float64); ok { + record.ActivePods = int(activePods) + } + + if failedPods, ok := row.Data["J"].(float64); ok { + record.FailedPods = int(failedPods) + } + + if successfulPods, ok := row.Data["K"].(float64); ok { + record.SuccessfulPods = int(successfulPods) + } + + record.Meta = map[string]string{} + if _, ok := jobAttrs[record.JobName]; ok { + record.Meta = jobAttrs[record.JobName] + } + + for k, v := range row.Data { + if slices.Contains(jobQueryNames, k) { + continue + } + if labelValue, ok := v.(string); ok { + record.Meta[k] = labelValue + } + } + + records = append(records, record) + } + } + resp.Total = len(allJobGroups) + resp.Records = records + + return resp, nil +} diff --git a/pkg/query-service/app/inframetrics/statefulsets.go b/pkg/query-service/app/inframetrics/statefulsets.go new file mode 100644 index 0000000000..2d5d6d8313 --- /dev/null +++ b/pkg/query-service/app/inframetrics/statefulsets.go @@ -0,0 +1,444 @@ +package inframetrics + +import ( + "context" + "math" + "sort" + + "go.signoz.io/signoz/pkg/query-service/app/metrics/v4/helpers" + "go.signoz.io/signoz/pkg/query-service/common" + "go.signoz.io/signoz/pkg/query-service/interfaces" + "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/postprocess" + "golang.org/x/exp/slices" +) + +var ( + metricToUseForStatefulSets = "k8s_pod_cpu_utilization" + k8sStatefulSetNameAttrKey = "k8s_statefulset_name" + + metricNamesForStatefulSets = map[string]string{ + "desired_pods": "k8s_statefulset_desired_pods", + "available_pods": "k8s_statefulset_current_pods", + } + + statefulSetAttrsToEnrich = []string{ + "k8s_statefulset_name", + "k8s_namespace_name", + "k8s_cluster_name", + } + + queryNamesForStatefulSets = map[string][]string{ + "cpu": {"A"}, + "cpu_request": {"B", "A"}, + "cpu_limit": {"C", "A"}, + "memory": {"D"}, + "memory_request": {"E", "D"}, + "memory_limit": {"F", "D"}, + "restarts": {"G", "A"}, + "desired_pods": {"H"}, + "available_pods": {"I"}, + } + + builderQueriesForStatefulSets = map[string]*v3.BuilderQuery{ + // desired pods + "H": { + QueryName: "H", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForStatefulSets["desired_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "H", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // available pods + "I": { + QueryName: "I", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForStatefulSets["available_pods"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "I", + ReduceTo: v3.ReduceToOperatorLast, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + } + + statefulSetQueryNames = []string{"A", "B", "C", "D", "E", "F", "G", "H", "I"} +) + +type StatefulSetsRepo struct { + reader interfaces.Reader + querierV2 interfaces.Querier +} + +func NewStatefulSetsRepo(reader interfaces.Reader, querierV2 interfaces.Querier) *StatefulSetsRepo { + return &StatefulSetsRepo{reader: reader, querierV2: querierV2} +} + +func (d *StatefulSetsRepo) GetStatefulSetAttributeKeys(ctx context.Context, req v3.FilterAttributeKeyRequest) (*v3.FilterAttributeKeyResponse, error) { + // TODO(srikanthccv): remove hardcoded metric name and support keys from any pod metric + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = metricToUseForStatefulSets + if req.Limit == 0 { + req.Limit = 50 + } + + attributeKeysResponse, err := d.reader.GetMetricAttributeKeys(ctx, &req) + if err != nil { + return nil, err + } + + // TODO(srikanthccv): only return resource attributes when we have a way to + // distinguish between resource attributes and other attributes. + filteredKeys := []v3.AttributeKey{} + for _, key := range attributeKeysResponse.AttributeKeys { + if slices.Contains(pointAttrsToIgnore, key.Key) { + continue + } + filteredKeys = append(filteredKeys, key) + } + + return &v3.FilterAttributeKeyResponse{AttributeKeys: filteredKeys}, nil +} + +func (d *StatefulSetsRepo) GetStatefulSetAttributeValues(ctx context.Context, req v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { + req.DataSource = v3.DataSourceMetrics + req.AggregateAttribute = metricToUseForStatefulSets + if req.Limit == 0 { + req.Limit = 50 + } + + attributeValuesResponse, err := d.reader.GetMetricAttributeValues(ctx, &req) + if err != nil { + return nil, err + } + + return attributeValuesResponse, nil +} + +func (d *StatefulSetsRepo) getMetadataAttributes(ctx context.Context, req model.StatefulSetListRequest) (map[string]map[string]string, error) { + statefulSetAttrs := map[string]map[string]string{} + + for _, key := range statefulSetAttrsToEnrich { + hasKey := false + for _, groupByKey := range req.GroupBy { + if groupByKey.Key == key { + hasKey = true + break + } + } + if !hasKey { + req.GroupBy = append(req.GroupBy, v3.AttributeKey{Key: key}) + } + } + + mq := v3.BuilderQuery{ + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricToUseForStatefulSets, + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + GroupBy: req.GroupBy, + } + + query, err := helpers.PrepareTimeseriesFilterQuery(req.Start, req.End, &mq) + if err != nil { + return nil, err + } + + query = localQueryToDistributedQuery(query) + + attrsListResponse, err := d.reader.GetListResultV3(ctx, query) + if err != nil { + return nil, err + } + + for _, row := range attrsListResponse { + stringData := map[string]string{} + for key, value := range row.Data { + if str, ok := value.(string); ok { + stringData[key] = str + } else if strPtr, ok := value.(*string); ok { + stringData[key] = *strPtr + } + } + + statefulSetName := stringData[k8sStatefulSetNameAttrKey] + if _, ok := statefulSetAttrs[statefulSetName]; !ok { + statefulSetAttrs[statefulSetName] = map[string]string{} + } + + for _, key := range req.GroupBy { + statefulSetAttrs[statefulSetName][key.Key] = stringData[key.Key] + } + } + + return statefulSetAttrs, nil +} + +func (d *StatefulSetsRepo) getTopStatefulSetGroups(ctx context.Context, req model.StatefulSetListRequest, q *v3.QueryRangeParamsV3) ([]map[string]string, []map[string]string, error) { + step, timeSeriesTableName, samplesTableName := getParamsForTopStatefulSets(req) + + queryNames := queryNamesForStatefulSets[req.OrderBy.ColumnName] + topStatefulSetGroupsQueryRangeParams := &v3.QueryRangeParamsV3{ + Start: req.Start, + End: req.End, + Step: step, + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{}, + QueryType: v3.QueryTypeBuilder, + PanelType: v3.PanelTypeTable, + }, + } + + for _, queryName := range queryNames { + query := q.CompositeQuery.BuilderQueries[queryName].Clone() + query.StepInterval = step + query.MetricTableHints = &v3.MetricTableHints{ + TimeSeriesTableName: timeSeriesTableName, + SamplesTableName: samplesTableName, + } + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + topStatefulSetGroupsQueryRangeParams.CompositeQuery.BuilderQueries[queryName] = query + } + + queryResponse, _, err := d.querierV2.QueryRange(ctx, topStatefulSetGroupsQueryRangeParams) + if err != nil { + return nil, nil, err + } + formattedResponse, err := postprocess.PostProcessResult(queryResponse, topStatefulSetGroupsQueryRangeParams) + if err != nil { + return nil, nil, err + } + + if len(formattedResponse) == 0 || len(formattedResponse[0].Series) == 0 { + return nil, nil, nil + } + + if req.OrderBy.Order == v3.DirectionDesc { + sort.Slice(formattedResponse[0].Series, func(i, j int) bool { + return formattedResponse[0].Series[i].Points[0].Value > formattedResponse[0].Series[j].Points[0].Value + }) + } else { + sort.Slice(formattedResponse[0].Series, func(i, j int) bool { + return formattedResponse[0].Series[i].Points[0].Value < formattedResponse[0].Series[j].Points[0].Value + }) + } + + limit := math.Min(float64(req.Offset+req.Limit), float64(len(formattedResponse[0].Series))) + + paginatedTopStatefulSetGroupsSeries := formattedResponse[0].Series[req.Offset:int(limit)] + + topStatefulSetGroups := []map[string]string{} + for _, series := range paginatedTopStatefulSetGroupsSeries { + topStatefulSetGroups = append(topStatefulSetGroups, series.Labels) + } + allStatefulSetGroups := []map[string]string{} + for _, series := range formattedResponse[0].Series { + allStatefulSetGroups = append(allStatefulSetGroups, series.Labels) + } + + return topStatefulSetGroups, allStatefulSetGroups, nil +} + +func (d *StatefulSetsRepo) GetStatefulSetList(ctx context.Context, req model.StatefulSetListRequest) (model.StatefulSetListResponse, error) { + resp := model.StatefulSetListResponse{} + + if req.Limit == 0 { + req.Limit = 10 + } + + if req.OrderBy == nil { + req.OrderBy = &v3.OrderBy{ColumnName: "cpu", Order: v3.DirectionDesc} + } + + if req.GroupBy == nil { + req.GroupBy = []v3.AttributeKey{{Key: k8sStatefulSetNameAttrKey}} + resp.Type = model.ResponseTypeList + } else { + resp.Type = model.ResponseTypeGroupedList + } + + step := int64(math.Max(float64(common.MinAllowedStepInterval(req.Start, req.End)), 60)) + + query := WorkloadTableListQuery.Clone() + + query.Start = req.Start + query.End = req.End + query.Step = step + + // add additional queries for stateful sets + for _, statefulSetQuery := range builderQueriesForStatefulSets { + query.CompositeQuery.BuilderQueries[statefulSetQuery.QueryName] = statefulSetQuery + } + + for _, query := range query.CompositeQuery.BuilderQueries { + query.StepInterval = step + if req.Filters != nil && len(req.Filters.Items) > 0 { + if query.Filters == nil { + query.Filters = &v3.FilterSet{Operator: "AND", Items: []v3.FilterItem{}} + } + query.Filters.Items = append(query.Filters.Items, req.Filters.Items...) + } + query.GroupBy = req.GroupBy + // make sure we only get records for daemon sets + query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: k8sStatefulSetNameAttrKey}, + Operator: v3.FilterOperatorExists, + }) + } + + statefulSetAttrs, err := d.getMetadataAttributes(ctx, req) + if err != nil { + return resp, err + } + + topStatefulSetGroups, allStatefulSetGroups, err := d.getTopStatefulSetGroups(ctx, req, query) + if err != nil { + return resp, err + } + + groupFilters := map[string][]string{} + for _, topStatefulSetGroup := range topStatefulSetGroups { + for k, v := range topStatefulSetGroup { + groupFilters[k] = append(groupFilters[k], v) + } + } + + for groupKey, groupValues := range groupFilters { + hasGroupFilter := false + if req.Filters != nil && len(req.Filters.Items) > 0 { + for _, filter := range req.Filters.Items { + if filter.Key.Key == groupKey { + hasGroupFilter = true + break + } + } + } + + if !hasGroupFilter { + for _, query := range query.CompositeQuery.BuilderQueries { + query.Filters.Items = append(query.Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{Key: groupKey}, + Value: groupValues, + Operator: v3.FilterOperatorIn, + }) + } + } + } + + queryResponse, _, err := d.querierV2.QueryRange(ctx, query) + if err != nil { + return resp, err + } + + formattedResponse, err := postprocess.PostProcessResult(queryResponse, query) + if err != nil { + return resp, err + } + + records := []model.StatefulSetListRecord{} + + for _, result := range formattedResponse { + for _, row := range result.Table.Rows { + + record := model.StatefulSetListRecord{ + StatefulSetName: "", + CPUUsage: -1, + CPURequest: -1, + CPULimit: -1, + MemoryUsage: -1, + MemoryRequest: -1, + MemoryLimit: -1, + DesiredPods: -1, + AvailablePods: -1, + } + + if statefulSetName, ok := row.Data[k8sStatefulSetNameAttrKey].(string); ok { + record.StatefulSetName = statefulSetName + } + + if cpu, ok := row.Data["A"].(float64); ok { + record.CPUUsage = cpu + } + if cpuRequest, ok := row.Data["B"].(float64); ok { + record.CPURequest = cpuRequest + } + + if cpuLimit, ok := row.Data["C"].(float64); ok { + record.CPULimit = cpuLimit + } + + if memory, ok := row.Data["D"].(float64); ok { + record.MemoryUsage = memory + } + + if memoryRequest, ok := row.Data["E"].(float64); ok { + record.MemoryRequest = memoryRequest + } + + if memoryLimit, ok := row.Data["F"].(float64); ok { + record.MemoryLimit = memoryLimit + } + + if restarts, ok := row.Data["G"].(float64); ok { + record.Restarts = int(restarts) + } + + if desiredPods, ok := row.Data["H"].(float64); ok { + record.DesiredPods = int(desiredPods) + } + + if availablePods, ok := row.Data["I"].(float64); ok { + record.AvailablePods = int(availablePods) + } + + record.Meta = map[string]string{} + if _, ok := statefulSetAttrs[record.StatefulSetName]; ok { + record.Meta = statefulSetAttrs[record.StatefulSetName] + } + + for k, v := range row.Data { + if slices.Contains(statefulSetQueryNames, k) { + continue + } + if labelValue, ok := v.(string); ok { + record.Meta[k] = labelValue + } + } + + records = append(records, record) + } + } + resp.Total = len(allStatefulSetGroups) + resp.Records = records + + return resp, nil +} diff --git a/pkg/query-service/app/inframetrics/workload_query.go b/pkg/query-service/app/inframetrics/workload_query.go new file mode 100644 index 0000000000..6050dba50d --- /dev/null +++ b/pkg/query-service/app/inframetrics/workload_query.go @@ -0,0 +1,166 @@ +package inframetrics + +import v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + +var ( + metricNamesForWorkloads = map[string]string{ + "cpu": "k8s_pod_cpu_utilization", + "cpu_req": "k8s_pod_cpu_request_utilization", + "cpu_limit": "k8s_pod_cpu_limit_utilization", + "memory": "k8s_pod_memory_usage", + "memory_req": "k8s_pod_memory_request_utilization", + "memory_limit": "k8s_pod_memory_limit_utilization", + "restarts": "k8s_container_restarts", + } +) + +var WorkloadTableListQuery = v3.QueryRangeParamsV3{ + CompositeQuery: &v3.CompositeQuery{ + BuilderQueries: map[string]*v3.BuilderQuery{ + // pod cpu utilization + "A": { + QueryName: "A", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForWorkloads["cpu"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "A", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // pod cpu request utilization + "B": { + QueryName: "B", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForWorkloads["cpu_request"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "B", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // pod cpu limit utilization + "C": { + QueryName: "C", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForWorkloads["cpu_limit"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "C", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // pod memory utilization + "D": { + QueryName: "D", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForWorkloads["memory"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "D", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // pod memory request utilization + "E": { + QueryName: "E", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForWorkloads["memory_request"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "E", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + // pod memory limit utilization + "F": { + QueryName: "F", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForWorkloads["memory_limit"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "F", + ReduceTo: v3.ReduceToOperatorAvg, + TimeAggregation: v3.TimeAggregationAvg, + SpaceAggregation: v3.SpaceAggregationSum, + Disabled: false, + }, + "G": { + QueryName: "G", + DataSource: v3.DataSourceMetrics, + AggregateAttribute: v3.AttributeKey{ + Key: metricNamesForWorkloads["restarts"], + DataType: v3.AttributeKeyDataTypeFloat64, + }, + Temporality: v3.Unspecified, + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{}, + }, + GroupBy: []v3.AttributeKey{}, + Expression: "G", + ReduceTo: v3.ReduceToOperatorSum, + TimeAggregation: v3.TimeAggregationAnyLast, + SpaceAggregation: v3.SpaceAggregationMax, + Functions: []v3.Function{{Name: v3.FunctionNameRunningDiff}}, + Disabled: false, + }, + }, + PanelType: v3.PanelTypeTable, + QueryType: v3.QueryTypeBuilder, + }, + Version: "v4", + FormatForWeb: true, +} diff --git a/pkg/query-service/model/infra.go b/pkg/query-service/model/infra.go index 00cb48ee77..0c23a0642b 100644 --- a/pkg/query-service/model/infra.go +++ b/pkg/query-service/model/infra.go @@ -173,3 +173,125 @@ type ClusterListRecord struct { MemoryAllocatable float64 `json:"memoryAllocatable"` Meta map[string]string `json:"meta"` } + +type DeploymentListRequest struct { + Start int64 `json:"start"` // epoch time in ms + End int64 `json:"end"` // epoch time in ms + Filters *v3.FilterSet `json:"filters"` + GroupBy []v3.AttributeKey `json:"groupBy"` + OrderBy *v3.OrderBy `json:"orderBy"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +type DeploymentListResponse struct { + Type ResponseType `json:"type"` + Records []DeploymentListRecord `json:"records"` + Total int `json:"total"` +} + +type DeploymentListRecord struct { + DeploymentName string `json:"deploymentName"` + CPUUsage float64 `json:"cpuUsage"` + MemoryUsage float64 `json:"memoryUsage"` + DesiredPods int `json:"desiredPods"` + AvailablePods int `json:"availablePods"` + CPURequest float64 `json:"cpuRequest"` + MemoryRequest float64 `json:"memoryRequest"` + CPULimit float64 `json:"cpuLimit"` + MemoryLimit float64 `json:"memoryLimit"` + Restarts int `json:"restarts"` + Meta map[string]string `json:"meta"` +} + +type DaemonSetListRequest struct { + Start int64 `json:"start"` // epoch time in ms + End int64 `json:"end"` // epoch time in ms + Filters *v3.FilterSet `json:"filters"` + GroupBy []v3.AttributeKey `json:"groupBy"` + OrderBy *v3.OrderBy `json:"orderBy"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +type DaemonSetListResponse struct { + Type ResponseType `json:"type"` + Records []DaemonSetListRecord `json:"records"` + Total int `json:"total"` +} + +type DaemonSetListRecord struct { + DaemonSetName string `json:"daemonSetName"` + CPUUsage float64 `json:"cpuUsage"` + MemoryUsage float64 `json:"memoryUsage"` + CPURequest float64 `json:"cpuRequest"` + MemoryRequest float64 `json:"memoryRequest"` + CPULimit float64 `json:"cpuLimit"` + MemoryLimit float64 `json:"memoryLimit"` + Restarts int `json:"restarts"` + DesiredNodes int `json:"desiredNodes"` + AvailableNodes int `json:"availableNodes"` + Meta map[string]string `json:"meta"` +} + +type StatefulSetListRequest struct { + Start int64 `json:"start"` // epoch time in ms + End int64 `json:"end"` // epoch time in ms + Filters *v3.FilterSet `json:"filters"` + GroupBy []v3.AttributeKey `json:"groupBy"` + OrderBy *v3.OrderBy `json:"orderBy"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +type StatefulSetListResponse struct { + Type ResponseType `json:"type"` + Records []StatefulSetListRecord `json:"records"` + Total int `json:"total"` +} + +type StatefulSetListRecord struct { + StatefulSetName string `json:"statefulSetName"` + CPUUsage float64 `json:"cpuUsage"` + MemoryUsage float64 `json:"memoryUsage"` + CPURequest float64 `json:"cpuRequest"` + MemoryRequest float64 `json:"memoryRequest"` + CPULimit float64 `json:"cpuLimit"` + MemoryLimit float64 `json:"memoryLimit"` + Restarts int `json:"restarts"` + DesiredPods int `json:"desiredPods"` + AvailablePods int `json:"availablePods"` + Meta map[string]string `json:"meta"` +} + +type JobListRequest struct { + Start int64 `json:"start"` // epoch time in ms + End int64 `json:"end"` // epoch time in ms + Filters *v3.FilterSet `json:"filters"` + GroupBy []v3.AttributeKey `json:"groupBy"` + OrderBy *v3.OrderBy `json:"orderBy"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +type JobListResponse struct { + Type ResponseType `json:"type"` + Records []JobListRecord `json:"records"` + Total int `json:"total"` +} + +type JobListRecord struct { + JobName string `json:"jobName"` + CPUUsage float64 `json:"cpuUsage"` + MemoryUsage float64 `json:"memoryUsage"` + CPURequest float64 `json:"cpuRequest"` + MemoryRequest float64 `json:"memoryRequest"` + CPULimit float64 `json:"cpuLimit"` + MemoryLimit float64 `json:"memoryLimit"` + Restarts int `json:"restarts"` + DesiredSuccessfulPods int `json:"desiredSuccessfulPods"` + ActivePods int `json:"activePods"` + FailedPods int `json:"failedPods"` + SuccessfulPods int `json:"successfulPods"` + Meta map[string]string `json:"meta"` +} From 85ac21f2533a2fe566044307b720554db5620013 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Tue, 12 Nov 2024 22:52:42 +0530 Subject: [PATCH 2/3] fix: update request payload for span metrics queries (#6323) --- .../MetricsPageQueries/DBCallQueries.ts | 5 +++ .../MetricsPageQueries/ExternalQueries.ts | 5 +++ .../MetricsPageQueriesFactory.ts | 4 +++ .../MetricsPageQueries/OverviewQueries.ts | 18 +++++++++++ .../MetricsApplication/Tabs/types.ts | 2 ++ .../container/MetricsApplication/constant.ts | 32 ++++++++++++++++--- 6 files changed, 61 insertions(+), 5 deletions(-) diff --git a/frontend/src/container/MetricsApplication/MetricsPageQueries/DBCallQueries.ts b/frontend/src/container/MetricsApplication/MetricsPageQueries/DBCallQueries.ts index 91343b7b06..f3124f0ad1 100644 --- a/frontend/src/container/MetricsApplication/MetricsPageQueries/DBCallQueries.ts +++ b/frontend/src/container/MetricsApplication/MetricsPageQueries/DBCallQueries.ts @@ -58,12 +58,17 @@ export const databaseCallsRPS = ({ const legends = [legend]; const dataSource = DataSource.METRICS; + const timeAggregateOperators = [MetricAggregateOperator.RATE]; + const spaceAggregateOperators = [MetricAggregateOperator.SUM]; + return getQueryBuilderQueries({ autocompleteData, groupBy, legends, filterItems, dataSource, + timeAggregateOperators, + spaceAggregateOperators, }); }; diff --git a/frontend/src/container/MetricsApplication/MetricsPageQueries/ExternalQueries.ts b/frontend/src/container/MetricsApplication/MetricsPageQueries/ExternalQueries.ts index a2c87f0874..6a7ab65906 100644 --- a/frontend/src/container/MetricsApplication/MetricsPageQueries/ExternalQueries.ts +++ b/frontend/src/container/MetricsApplication/MetricsPageQueries/ExternalQueries.ts @@ -213,12 +213,17 @@ export const externalCallRpsByAddress = ({ const legends = [legend]; const dataSource = DataSource.METRICS; + const timeAggregateOperators = [MetricAggregateOperator.RATE]; + const spaceAggregateOperators = [MetricAggregateOperator.SUM]; + return getQueryBuilderQueries({ autocompleteData, groupBy, legends, filterItems, dataSource, + timeAggregateOperators, + spaceAggregateOperators, }); }; diff --git a/frontend/src/container/MetricsApplication/MetricsPageQueries/MetricsPageQueriesFactory.ts b/frontend/src/container/MetricsApplication/MetricsPageQueries/MetricsPageQueriesFactory.ts index 71a16fcc07..e8b0fcc807 100644 --- a/frontend/src/container/MetricsApplication/MetricsPageQueries/MetricsPageQueriesFactory.ts +++ b/frontend/src/container/MetricsApplication/MetricsPageQueries/MetricsPageQueriesFactory.ts @@ -25,6 +25,8 @@ export const getQueryBuilderQueries = ({ aggregateOperator, dataSource, queryNameAndExpression, + timeAggregateOperators, + spaceAggregateOperators, }: BuilderQueriesProps): QueryBuilderData => ({ queryFormulas: [], queryData: autocompleteData.map((item, index) => { @@ -50,6 +52,8 @@ export const getQueryBuilderQueries = ({ op: 'AND', }, reduceTo: 'avg', + spaceAggregation: spaceAggregateOperators[index], + timeAggregation: timeAggregateOperators[index], dataSource, }; diff --git a/frontend/src/container/MetricsApplication/MetricsPageQueries/OverviewQueries.ts b/frontend/src/container/MetricsApplication/MetricsPageQueries/OverviewQueries.ts index d27bfc01be..0d2c05a349 100644 --- a/frontend/src/container/MetricsApplication/MetricsPageQueries/OverviewQueries.ts +++ b/frontend/src/container/MetricsApplication/MetricsPageQueries/OverviewQueries.ts @@ -83,6 +83,17 @@ export const latency = ({ const dataSource = isSpanMetricEnable ? DataSource.METRICS : DataSource.TRACES; const queryNameAndExpression = QUERYNAME_AND_EXPRESSION; + const timeAggregateOperators = [ + MetricAggregateOperator.EMPTY, + MetricAggregateOperator.EMPTY, + MetricAggregateOperator.EMPTY, + ]; + const spaceAggregateOperators = [ + MetricAggregateOperator.P50, + MetricAggregateOperator.P90, + MetricAggregateOperator.P99, + ]; + return getQueryBuilderQueries({ autocompleteData, legends, @@ -90,6 +101,8 @@ export const latency = ({ aggregateOperator, dataSource, queryNameAndExpression, + timeAggregateOperators, + spaceAggregateOperators, }); }; @@ -510,11 +523,16 @@ export const operationPerSec = ({ const legends = OPERATION_LEGENDS; const dataSource = DataSource.METRICS; + const timeAggregateOperators = [MetricAggregateOperator.RATE]; + const spaceAggregateOperators = [MetricAggregateOperator.SUM]; + return getQueryBuilderQueries({ autocompleteData, legends, filterItems, dataSource, + timeAggregateOperators, + spaceAggregateOperators, }); }; diff --git a/frontend/src/container/MetricsApplication/Tabs/types.ts b/frontend/src/container/MetricsApplication/Tabs/types.ts index 9b45bd5492..4dcb3bc01e 100644 --- a/frontend/src/container/MetricsApplication/Tabs/types.ts +++ b/frontend/src/container/MetricsApplication/Tabs/types.ts @@ -29,6 +29,8 @@ export interface BuilderQueriesProps { aggregateOperator?: string[]; dataSource: DataSource; queryNameAndExpression?: string[]; + timeAggregateOperators: MetricAggregateOperator[]; + spaceAggregateOperators: MetricAggregateOperator[]; } export interface BuilderQuerieswithFormulaProps { diff --git a/frontend/src/container/MetricsApplication/constant.ts b/frontend/src/container/MetricsApplication/constant.ts index decd31534b..75853cc8ea 100644 --- a/frontend/src/container/MetricsApplication/constant.ts +++ b/frontend/src/container/MetricsApplication/constant.ts @@ -2,18 +2,27 @@ import { DownloadOptions } from 'container/Download/Download.types'; import { MenuItemKeys } from 'container/GridCardLayout/WidgetHeader/contants'; +import { + MetricAggregateOperator, + TracesAggregatorOperator, +} from 'types/common/queryBuilder'; export const legend = { address: '{{address}}', }; export const QUERYNAME_AND_EXPRESSION = ['A', 'B', 'C']; -export const LATENCY_AGGREGATEOPERATOR = ['p50', 'p90', 'p99']; +export const LATENCY_AGGREGATEOPERATOR = [ + TracesAggregatorOperator.P50, + TracesAggregatorOperator.P90, + TracesAggregatorOperator.P99, +]; export const LATENCY_AGGREGATEOPERATOR_SPAN_METRICS = [ - 'hist_quantile_50', - 'hist_quantile_90', - 'hist_quantile_99', + MetricAggregateOperator.P50, + MetricAggregateOperator.P90, + MetricAggregateOperator.P99, ]; + export const OPERATION_LEGENDS = ['Operations']; export const MENU_ITEMS = [MenuItemKeys.View, MenuItemKeys.CreateAlerts]; @@ -21,8 +30,21 @@ export const MENU_ITEMS = [MenuItemKeys.View, MenuItemKeys.CreateAlerts]; export enum FORMULA { ERROR_PERCENTAGE = 'A*100/B', DATABASE_CALLS_AVG_DURATION = 'A/B', + // The apdex formula is (satisfied_count + 0.5 * tolerating_count + 0 * frustating_count) / total_count + // The satisfied_count is B, tolerating_count is C, total_count is A + // But why do we have (B+C)/2 instead of B + C/2? + // The way we issue the query is latency <= threshold, which means we over count i.e + // query B => durationNano <= 500ms + // query C => durationNano <= 2000ms + // Since <= 2000ms includes <= 500ms, we over count, to correct we subtract B/2 + // so the full expression would be (B + C/2) - B/2 = (B+C)/2 APDEX_TRACES = '((B + C)/2)/A', - APDEX_DELTA_SPAN_METRICS = '((B + C)/2)/A', + // Does the same not apply for delta span metrics? + // No, because the delta metrics store the counts just for the current bucket + // so we don't need to subtract anything + APDEX_DELTA_SPAN_METRICS = '(B + C)/A', + // Cumulative span metrics store the counts for all buckets + // so we need to subtract B/2 to correct the over counting APDEX_CUMULATIVE_SPAN_METRICS = '((B + C)/2)/A', } From 01fda5195936ad3387c6b348a91b3dec006699df Mon Sep 17 00:00:00 2001 From: Vikrant Gupta Date: Wed, 13 Nov 2024 00:25:00 +0530 Subject: [PATCH 3/3] chore: return proper http codes on unique constraint error (#6428) --- ee/query-service/license/db.go | 13 ++++++++++--- ee/query-service/license/manager.go | 2 +- pkg/query-service/app/http_handler.go | 2 ++ 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/ee/query-service/license/db.go b/ee/query-service/license/db.go index 12df69233d..eae48e266d 100644 --- a/ee/query-service/license/db.go +++ b/ee/query-service/license/db.go @@ -8,6 +8,7 @@ import ( "time" "github.com/jmoiron/sqlx" + "github.com/mattn/go-sqlite3" "go.signoz.io/signoz/ee/query-service/license/sqlite" "go.signoz.io/signoz/ee/query-service/model" @@ -274,14 +275,14 @@ func (r *Repo) InitFeatures(req basemodel.FeatureSet) error { } // InsertLicenseV3 inserts a new license v3 in db -func (r *Repo) InsertLicenseV3(ctx context.Context, l *model.LicenseV3) error { +func (r *Repo) InsertLicenseV3(ctx context.Context, l *model.LicenseV3) *model.ApiError { query := `INSERT INTO licenses_v3 (id, key, data) VALUES ($1, $2, $3)` // licsense is the entity of zeus so putting the entire license here without defining schema licenseData, err := json.Marshal(l.Data) if err != nil { - return fmt.Errorf("insert license failed: license marshal error") + return &model.ApiError{Typ: basemodel.ErrorBadData, Err: err} } _, err = r.db.ExecContext(ctx, @@ -292,8 +293,14 @@ func (r *Repo) InsertLicenseV3(ctx context.Context, l *model.LicenseV3) error { ) if err != nil { + if sqliteErr, ok := err.(sqlite3.Error); ok { + if sqliteErr.ExtendedCode == sqlite3.ErrConstraintUnique { + zap.L().Error("error in inserting license data: ", zap.Error(sqliteErr)) + return &model.ApiError{Typ: model.ErrorConflict, Err: sqliteErr} + } + } zap.L().Error("error in inserting license data: ", zap.Error(err)) - return fmt.Errorf("failed to insert license in db: %v", err) + return &model.ApiError{Typ: basemodel.ErrorExec, Err: err} } return nil diff --git a/ee/query-service/license/manager.go b/ee/query-service/license/manager.go index 13b869da8c..6dcc704e3a 100644 --- a/ee/query-service/license/manager.go +++ b/ee/query-service/license/manager.go @@ -463,7 +463,7 @@ func (lm *Manager) ActivateV3(ctx context.Context, licenseKey string) (licenseRe err := lm.repo.InsertLicenseV3(ctx, license) if err != nil { zap.L().Error("failed to activate license", zap.Error(err)) - return nil, model.InternalError(err) + return nil, err } // license is valid, activate it diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 0f6d351af7..6586e21d98 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -332,6 +332,8 @@ func RespondError(w http.ResponseWriter, apiErr model.BaseApiError, data interfa code = http.StatusUnauthorized case model.ErrorForbidden: code = http.StatusForbidden + case model.ErrorConflict: + code = http.StatusConflict default: code = http.StatusInternalServerError }