From 67e822e23ef5744618b5a7d9e516c3c30a35e6c3 Mon Sep 17 00:00:00 2001 From: Nityananda Gohain Date: Thu, 19 Dec 2024 11:52:20 +0700 Subject: [PATCH] feat: api for trace materialization (#6646) * feat: api for trace materialization * fix: minor changes and cleanup * fix: minor fixes * fix: update errors * fix: address comments * fix: address comments --- .../app/clickhouseReader/reader.go | 177 +++++++++++++++++- pkg/query-service/app/http_handler.go | 35 ++++ pkg/query-service/app/logs/parser.go | 8 +- pkg/query-service/app/logs/parser_test.go | 8 +- pkg/query-service/app/logs/validator.go | 34 ++++ pkg/query-service/constants/constants.go | 2 +- pkg/query-service/interfaces/interface.go | 4 + pkg/query-service/model/response.go | 6 +- 8 files changed, 254 insertions(+), 20 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 9c7828af45..2a1c1f5782 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -2694,8 +2694,8 @@ func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Contex } // remove this after sometime -func removeUnderscoreDuplicateFields(fields []model.LogField) []model.LogField { - lookup := map[string]model.LogField{} +func removeUnderscoreDuplicateFields(fields []model.Field) []model.Field { + lookup := map[string]model.Field{} for _, v := range fields { lookup[v.Name+v.DataType] = v } @@ -2706,7 +2706,7 @@ func removeUnderscoreDuplicateFields(fields []model.LogField) []model.LogField { } } - updatedFields := []model.LogField{} + updatedFields := []model.Field{} for _, v := range lookup { updatedFields = append(updatedFields, v) } @@ -2717,11 +2717,11 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe // response will contain top level fields from the otel log model response := model.GetFieldsResponse{ Selected: constants.StaticSelectedLogFields, - Interesting: []model.LogField{}, + Interesting: []model.Field{}, } // get attribute keys - attributes := []model.LogField{} + attributes := []model.Field{} query := fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsAttributeKeys) err := r.db.Select(ctx, &attributes, query) if err != nil { @@ -2729,7 +2729,7 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe } // get resource keys - resources := []model.LogField{} + resources := []model.Field{} query = fmt.Sprintf("SELECT DISTINCT name, datatype from %s.%s group by name, datatype", r.logsDB, r.logsResourceKeys) err = r.db.Select(ctx, &resources, query) if err != nil { @@ -2753,9 +2753,11 @@ func (r *ClickHouseReader) GetLogFields(ctx context.Context) (*model.GetFieldsRe return &response, nil } -func (r *ClickHouseReader) extractSelectedAndInterestingFields(tableStatement string, fieldType string, fields *[]model.LogField, response *model.GetFieldsResponse) { +func (r *ClickHouseReader) extractSelectedAndInterestingFields(tableStatement string, overrideFieldType string, fields *[]model.Field, response *model.GetFieldsResponse) { for _, field := range *fields { - field.Type = fieldType + if overrideFieldType != "" { + field.Type = overrideFieldType + } // all static fields are assumed to be selected as we don't allow changing them if isColumn(r.useLogsNewSchema, tableStatement, field.Type, field.Name, field.DataType) { response.Selected = append(response.Selected, field) @@ -2945,6 +2947,165 @@ func (r *ClickHouseReader) UpdateLogField(ctx context.Context, field *model.Upda return nil } +func (r *ClickHouseReader) GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) { + // response will contain top level fields from the otel trace model + response := model.GetFieldsResponse{ + Selected: []model.Field{}, + Interesting: []model.Field{}, + } + + // get the top level selected fields + for _, field := range constants.NewStaticFieldsTraces { + if (v3.AttributeKey{} == field) { + continue + } + response.Selected = append(response.Selected, model.Field{ + Name: field.Key, + DataType: field.DataType.String(), + Type: constants.Static, + }) + } + + // get attribute keys + attributes := []model.Field{} + query := fmt.Sprintf("SELECT tagKey, tagType, dataType from %s.%s group by tagKey, tagType, dataType", r.TraceDB, r.spanAttributesKeysTable) + rows, err := r.db.Query(ctx, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + defer rows.Close() + + var tagKey string + var dataType string + var tagType string + for rows.Next() { + if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + attributes = append(attributes, model.Field{ + Name: tagKey, + DataType: dataType, + Type: tagType, + }) + } + + statements := []model.ShowCreateTableStatement{} + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceLocalTableName) + err = r.db.Select(ctx, &statements, query) + if err != nil { + return nil, &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + r.extractSelectedAndInterestingFields(statements[0].Statement, "", &attributes, &response) + + return &response, nil + +} + +func (r *ClickHouseReader) UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError { + if !field.Selected { + return model.ForbiddenError(errors.New("removing a selected field is not allowed, please reach out to support.")) + } + + // name of the materialized column + colname := utils.GetClickhouseColumnNameV2(field.Type, field.DataType, field.Name) + + field.DataType = strings.ToLower(field.DataType) + + // dataType and chDataType of the materialized column + var dataTypeMap = map[string]string{ + "string": "string", + "bool": "bool", + "int64": "number", + "float64": "number", + } + var chDataTypeMap = map[string]string{ + "string": "String", + "bool": "Bool", + "int64": "Float64", + "float64": "Float64", + } + chDataType := chDataTypeMap[field.DataType] + dataType := dataTypeMap[field.DataType] + + // typeName: tag => attributes, resource => resources + typeName := field.Type + if field.Type == string(v3.AttributeKeyTypeTag) { + typeName = constants.Attributes + } else if field.Type == string(v3.AttributeKeyTypeResource) { + typeName = constants.Resources + } + + attrColName := fmt.Sprintf("%s_%s", typeName, dataType) + for _, table := range []string{r.traceLocalTableName, r.traceTableName} { + q := "ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s` %s DEFAULT %s['%s'] CODEC(ZSTD(1))" + query := fmt.Sprintf(q, + r.TraceDB, table, + r.cluster, + colname, chDataType, + attrColName, + field.Name, + ) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD COLUMN IF NOT EXISTS `%s_exists` bool DEFAULT if(mapContains(%s, '%s') != 0, true, false) CODEC(ZSTD(1))", + r.TraceDB, table, + r.cluster, + colname, + attrColName, + field.Name, + ) + err = r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + } + + // create the index + if strings.ToLower(field.DataType) == "bool" { + // there is no point in creating index for bool attributes as the cardinality is just 2 + return nil + } + + if field.IndexType == "" { + field.IndexType = constants.DefaultLogSkipIndexType + } + if field.IndexGranularity == 0 { + field.IndexGranularity = constants.DefaultLogSkipIndexGranularity + } + query := fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS `%s_idx` (`%s`) TYPE %s GRANULARITY %d", + r.TraceDB, r.traceLocalTableName, + r.cluster, + colname, + colname, + field.IndexType, + field.IndexGranularity, + ) + err := r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + + // add a default minmax index for numbers + if dataType == "number" { + query = fmt.Sprintf("ALTER TABLE %s.%s ON CLUSTER %s ADD INDEX IF NOT EXISTS `%s_minmax_idx` (`%s`) TYPE minmax GRANULARITY 1", + r.TraceDB, r.traceLocalTableName, + r.cluster, + colname, + colname, + ) + err = r.db.Exec(ctx, query) + if err != nil { + return &model.ApiError{Err: err, Typ: model.ErrorInternal} + } + } + + return nil +} + func (r *ClickHouseReader) GetLogs(ctx context.Context, params *model.LogsFilterParams) (*[]model.SignozLog, *model.ApiError) { response := []model.SignozLog{} fields, apiErr := r.GetLogFields(ctx) diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index ba16894438..ee88eb7056 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -527,6 +527,9 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router, am *AuthMiddleware) { router.HandleFunc("/api/v1/settings/ingestion_key", am.AdminAccess(aH.insertIngestionKey)).Methods(http.MethodPost) router.HandleFunc("/api/v1/settings/ingestion_key", am.ViewAccess(aH.getIngestionKeys)).Methods(http.MethodGet) + router.HandleFunc("/api/v2/traces/fields", am.ViewAccess(aH.traceFields)).Methods(http.MethodGet) + router.HandleFunc("/api/v2/traces/fields", am.EditAccess(aH.updateTraceField)).Methods(http.MethodPost) + router.HandleFunc("/api/v1/version", am.OpenAccess(aH.getVersion)).Methods(http.MethodGet) router.HandleFunc("/api/v1/featureFlags", am.OpenAccess(aH.getFeatureFlags)).Methods(http.MethodGet) router.HandleFunc("/api/v1/configs", am.OpenAccess(aH.getConfigs)).Methods(http.MethodGet) @@ -4892,3 +4895,35 @@ func (aH *APIHandler) QueryRangeV4(w http.ResponseWriter, r *http.Request) { aH.queryRangeV4(r.Context(), queryRangeParams, w, r) } + +func (aH *APIHandler) traceFields(w http.ResponseWriter, r *http.Request) { + fields, apiErr := aH.reader.GetTraceFields(r.Context()) + if apiErr != nil { + RespondError(w, apiErr, "failed to fetch fields from the db") + return + } + aH.WriteJSON(w, r, fields) +} + +func (aH *APIHandler) updateTraceField(w http.ResponseWriter, r *http.Request) { + field := model.UpdateField{} + if err := json.NewDecoder(r.Body).Decode(&field); err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErr, "failed to decode payload") + return + } + + err := logs.ValidateUpdateFieldPayloadV2(&field) + if err != nil { + apiErr := &model.ApiError{Typ: model.ErrorBadData, Err: err} + RespondError(w, apiErr, "incorrect payload") + return + } + + apiErr := aH.reader.UpdateTraceField(r.Context(), &field) + if apiErr != nil { + RespondError(w, apiErr, "failed to update field in the db") + return + } + aH.WriteJSON(w, r, field) +} diff --git a/pkg/query-service/app/logs/parser.go b/pkg/query-service/app/logs/parser.go index 855a023528..be524d00da 100644 --- a/pkg/query-service/app/logs/parser.go +++ b/pkg/query-service/app/logs/parser.go @@ -228,8 +228,8 @@ func parseColumn(s string) (*string, error) { return &colName, nil } -func arrayToMap(fields []model.LogField) map[string]model.LogField { - res := map[string]model.LogField{} +func arrayToMap(fields []model.Field) map[string]model.Field { + res := map[string]model.Field{} for _, field := range fields { res[field.Name] = field } @@ -251,7 +251,7 @@ func replaceInterestingFields(allFields *model.GetFieldsResponse, queryTokens [] return queryTokens, nil } -func replaceFieldInToken(queryToken string, selectedFieldsLookup map[string]model.LogField, interestingFieldLookup map[string]model.LogField) (string, error) { +func replaceFieldInToken(queryToken string, selectedFieldsLookup map[string]model.Field, interestingFieldLookup map[string]model.Field) (string, error) { op := strings.TrimSpace(operatorRegex.FindString(queryToken)) opLower := strings.ToLower(op) @@ -283,7 +283,7 @@ func replaceFieldInToken(queryToken string, selectedFieldsLookup map[string]mode } } else { // creating the query token here as we have the metadata - field := model.LogField{} + field := model.Field{} if sfield, ok := selectedFieldsLookup[sqlColName]; ok { field = sfield diff --git a/pkg/query-service/app/logs/parser_test.go b/pkg/query-service/app/logs/parser_test.go index 6397738437..f894fcaecd 100644 --- a/pkg/query-service/app/logs/parser_test.go +++ b/pkg/query-service/app/logs/parser_test.go @@ -238,14 +238,14 @@ func TestParseColumn(t *testing.T) { func TestReplaceInterestingFields(t *testing.T) { queryTokens := []string{"id.userid IN (100) ", "and id_key >= 50 ", `AND body ILIKE '%searchstring%'`} allFields := model.GetFieldsResponse{ - Selected: []model.LogField{ + Selected: []model.Field{ { Name: "id_key", DataType: "int64", Type: "attributes", }, }, - Interesting: []model.LogField{ + Interesting: []model.Field{ { Name: "id.userid", DataType: "int64", @@ -326,7 +326,7 @@ func TestCheckIfPrevousPaginateAndModifyOrder(t *testing.T) { } var generateSQLQueryFields = model.GetFieldsResponse{ - Selected: []model.LogField{ + Selected: []model.Field{ { Name: "field1", DataType: "int64", @@ -348,7 +348,7 @@ var generateSQLQueryFields = model.GetFieldsResponse{ Type: "static", }, }, - Interesting: []model.LogField{ + Interesting: []model.Field{ { Name: "FielD1", DataType: "int64", diff --git a/pkg/query-service/app/logs/validator.go b/pkg/query-service/app/logs/validator.go index d4a1e42234..03432922dd 100644 --- a/pkg/query-service/app/logs/validator.go +++ b/pkg/query-service/app/logs/validator.go @@ -6,6 +6,7 @@ import ( "go.signoz.io/signoz/pkg/query-service/constants" "go.signoz.io/signoz/pkg/query-service/model" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) func ValidateUpdateFieldPayload(field *model.UpdateField) error { @@ -38,3 +39,36 @@ func ValidateUpdateFieldPayload(field *model.UpdateField) error { } return nil } + +func ValidateUpdateFieldPayloadV2(field *model.UpdateField) error { + if field.Name == "" { + return fmt.Errorf("name cannot be empty") + } + if field.Type == "" { + return fmt.Errorf("type cannot be empty") + } + if field.DataType == "" { + return fmt.Errorf("dataType cannot be empty") + } + + // the logs api uses the old names i.e attributes and resources while traces use tag and attribute. + // update log api to use tag and attribute. + matched, err := regexp.MatchString(fmt.Sprintf("^(%s|%s)$", v3.AttributeKeyTypeTag, v3.AttributeKeyTypeResource), field.Type) + if err != nil { + return err + } + if !matched { + return fmt.Errorf("type %s not supported", field.Type) + } + + if field.IndexType != "" { + matched, err := regexp.MatchString(`^(minmax|set\([0-9]\)|bloom_filter\((0?.?[0-9]+|1)\)|tokenbf_v1\([0-9]+,[0-9]+,[0-9]+\)|ngrambf_v1\([0-9]+,[0-9]+,[0-9]+,[0-9]+\))$`, field.IndexType) + if err != nil { + return err + } + if !matched { + return fmt.Errorf("index type %s not supported", field.IndexType) + } + } + return nil +} diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index 7d6f087188..242b2cd4a3 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -290,7 +290,7 @@ const ( UINT8 = "Uint8" ) -var StaticSelectedLogFields = []model.LogField{ +var StaticSelectedLogFields = []model.Field{ { Name: "timestamp", DataType: UINT32, diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index a2acd8c6c9..ac4ab91f9e 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -109,6 +109,10 @@ type Reader interface { SubscribeToQueryProgress(queryId string) (<-chan model.QueryProgress, func(), *model.ApiError) GetCountOfThings(ctx context.Context, query string) (uint64, error) + + //trace + GetTraceFields(ctx context.Context) (*model.GetFieldsResponse, *model.ApiError) + UpdateTraceField(ctx context.Context, field *model.UpdateField) *model.ApiError } type Querier interface { diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 5058d71534..740dfa6ceb 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -509,15 +509,15 @@ type ShowCreateTableStatement struct { Statement string `json:"statement" ch:"statement"` } -type LogField struct { +type Field struct { Name string `json:"name" ch:"name"` DataType string `json:"dataType" ch:"datatype"` Type string `json:"type"` } type GetFieldsResponse struct { - Selected []LogField `json:"selected"` - Interesting []LogField `json:"interesting"` + Selected []Field `json:"selected"` + Interesting []Field `json:"interesting"` } // Represents a log record in query service requests and responses.