From d0a3ed3f3a28ebacc9c4b8f6d5f857de3b61f599 Mon Sep 17 00:00:00 2001 From: mehul Date: Mon, 7 Oct 2024 22:52:20 +0530 Subject: [PATCH 1/4] created mapping.go Signed-off-by: mehul --- .../storage/cassandra/mappings/connection.go | 35 +++ plugin/storage/cassandra/mappings/mapping.go | 231 ++++++++++++++++++ plugin/storage/cassandra/mappings/test.go | 24 ++ .../storage/cassandra/mappings/v001.cql.tmpl | 207 ++++++++++++++++ .../storage/cassandra/mappings/v002.cql.tmpl | 203 +++++++++++++++ .../storage/cassandra/mappings/v003.cql.tmpl | 229 +++++++++++++++++ .../storage/cassandra/mappings/v004.cql.tmpl | 222 +++++++++++++++++ 7 files changed, 1151 insertions(+) create mode 100644 plugin/storage/cassandra/mappings/connection.go create mode 100644 plugin/storage/cassandra/mappings/mapping.go create mode 100644 plugin/storage/cassandra/mappings/test.go create mode 100644 plugin/storage/cassandra/mappings/v001.cql.tmpl create mode 100644 plugin/storage/cassandra/mappings/v002.cql.tmpl create mode 100644 plugin/storage/cassandra/mappings/v003.cql.tmpl create mode 100644 plugin/storage/cassandra/mappings/v004.cql.tmpl diff --git a/plugin/storage/cassandra/mappings/connection.go b/plugin/storage/cassandra/mappings/connection.go new file mode 100644 index 00000000000..474b17ae6ff --- /dev/null +++ b/plugin/storage/cassandra/mappings/connection.go @@ -0,0 +1,35 @@ +package main2fefwsef + +import ( + "fmt" + "log" + + "github.com/gocql/gocql" +) + +func main() { + // Replace with your Cassandra contact points + cluster := gocql.NewCluster("127.0.0.1") + cluster.Keyspace = "jaeger_v1_test" // Optional: Set default keyspace + session, err := cluster.CreateSession() + if err != nil { + log.Fatal(err) + } + defer session.Close() + + // Your schema creation logic goes here + createSamplingProbabilities := ` + CREATE TABLE IF NOT EXISTS jaeger_v1_test.sampling_probabilities ( + bucket int, + ts timeuuid, + hostname text, + probabilities text, + PRIMARY KEY(bucket, ts) + ) WITH CLUSTERING ORDER BY (ts desc);` + if err := session.Query(createSamplingProbabilities).Exec(); err != nil { + log.Fatalf("Failed to create sampling_probabilities table: %v", err) + } else { + fmt.Println("Table sampling_probabilities created successfully.") + } + +} diff --git a/plugin/storage/cassandra/mappings/mapping.go b/plugin/storage/cassandra/mappings/mapping.go new file mode 100644 index 00000000000..eb19e90f9cc --- /dev/null +++ b/plugin/storage/cassandra/mappings/mapping.go @@ -0,0 +1,231 @@ +package main + +import ( + "bytes" + "errors" + "fmt" + "github.com/gocql/gocql" + "os" + "regexp" + "strconv" + "strings" + "text/template" +) + +type MappingBuilder struct { + cas_version uint + Mode string + Datacentre string + Keyspace string + Replication string + TraceTTL int + DependenciesTTL int + CompactionWindowSize int + CompactionWindowUnit string +} + +func (mb *MappingBuilder) renderTemplate(templatePath string) (string, error) { + tmpl, err := template.ParseFiles(templatePath) + if err != nil { + return "", err + } + + var renderedOutput bytes.Buffer + if err := tmpl.Execute(&renderedOutput, mb); err != nil { + return "", err + } + + return renderedOutput.String(), nil +} + +func RenderCQLTemplate(params MappingBuilder, cqlOutput string) (string, error) { + + commentRegex := regexp.MustCompile(`--.*`) + cqlOutput = commentRegex.ReplaceAllString(cqlOutput, "") + + lines := strings.Split(cqlOutput, "\n") + var filteredLines []string + + for _, line := range lines { + if strings.TrimSpace(line) != "" { + filteredLines = append(filteredLines, line) + } + } + + cqlOutput = strings.Join(filteredLines, "\n") + + replacements := map[string]string{ + "${keyspace}": params.Keyspace, + "${replication}": params.Replication, + "${trace_ttl}": fmt.Sprintf("%d", params.TraceTTL), + "${dependencies_ttl}": fmt.Sprintf("%d", params.DependenciesTTL), + "${compaction_window_size}": fmt.Sprintf("%d", params.CompactionWindowSize), + "${compaction_window_unit}": params.CompactionWindowUnit, + } + + for placeholder, value := range replacements { + cqlOutput = strings.ReplaceAll(cqlOutput, placeholder, value) + } + + return cqlOutput, nil +} + +func getEnv(key, defaultValue string) string { + if value, exists := os.LookupEnv(key); exists { + return value + } + return defaultValue +} + +func isValidKeyspace(keyspace string) bool { + match, _ := regexp.MatchString(`^[a-zA-Z0-9_]+$`, keyspace) + return match +} + +func getCompactionWindow(traceTTL int, compactionWindow string) (int, string, error) { + var compactionWindowSize int + var compactionWindowUnit string + + if compactionWindow != "" { + // Check the format of the compaction window + matched, err := regexp.MatchString(`^[0-9]+[mhd]$`, compactionWindow) + if err != nil { + return 0, "", err + } + + if matched { + // Extract size and unit + numStr := strings.TrimRight(compactionWindow, "mhd") + unitStr := strings.TrimLeft(compactionWindow, numStr) + + // Convert size to integer + compactionWindowSize, err = strconv.Atoi(numStr) + if err != nil { + return 0, "", errors.New("invalid compaction window size format") + } + compactionWindowUnit = unitStr + } else { + return 0, "", errors.New("invalid compaction window size format. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days.") + } + } else { + // Calculate default compaction window size and unit + traceTTLMinutes := traceTTL / 60 + compactionWindowSize = (traceTTLMinutes + 30 - 1) / 30 + compactionWindowUnit = "m" + } + + // Map the unit + switch compactionWindowUnit { + case "m": + compactionWindowUnit = "MINUTES" + case "h": + compactionWindowUnit = "HOURS" + case "d": + compactionWindowUnit = "DAYS" + default: + return 0, "", errors.New("invalid compaction window unit") + } + + return compactionWindowSize, compactionWindowUnit, nil +} + +func (mb *MappingBuilder) GetSpanServiceMappings() (spanMapping string, serviceMapping string, err error) { + traceTTL, _ := strconv.Atoi(getEnv("TRACE_TTL", "172800")) + dependenciesTTL, _ := strconv.Atoi(getEnv("DEPENDENCIES_TTL", "0")) + cas_version := getEnv("VERSION", "4") + //template := os.Args[1] + var template string + var cqlOutput string + if template == "" { + switch cas_version { + case "3": + template = "./v003.cql.tmpl" + case "4": + template = "./v004.cql.tmpl" + default: + template = "./v004.cql.tmpl" + } + mode := getEnv("MODE", "") + + if mode == "" { + fmt.Println("missing MODE parameter") + return + } + var datacentre, replications string + //var ReplicationFactor int + if mode == "prod" { + datacentre = getEnv("DATACENTRE", "") + if datacentre == "" { + fmt.Println("missing DATACENTRE parameter for prod mode") + return + } + replicationFactor, _ := strconv.Atoi(getEnv("REPLICATION_FACTOR", "2")) + replications = fmt.Sprintf("{'class': 'NetworkTopologyStrategy', '%s': %d}", datacentre, replicationFactor) + } else if mode == "test" { + datacentre = getEnv("DATACENTRE", "") + replicationFactor, _ := strconv.Atoi(getEnv("REPLICATION_FACTOR", "1")) + replications = fmt.Sprintf("{'class': 'SimpleStrategy', 'replication_factor': '%d'}", replicationFactor) + } else { + fmt.Printf("invaild mode: %s, expecting 'prod' or 'test'", mode) + return + } + keyspace := getEnv("KEYSPACE", fmt.Sprintf("jaeger_v1_%s", mode)) + if !isValidKeyspace(keyspace) { + fmt.Printf("invaild characters in KEYSPACE=%s parameter , please use letters, digits, and underscores only", keyspace) + return + } + CompactionWindowSize, compactionWindowUnit, _ := getCompactionWindow(traceTTL, getEnv("COMPACTION_WINDOW_SIZE", "")) + + params := MappingBuilder{ + Mode: mode, + Datacentre: datacentre, + Keyspace: keyspace, + Replication: replications, + TraceTTL: traceTTL, + DependenciesTTL: dependenciesTTL, + CompactionWindowSize: CompactionWindowSize, + CompactionWindowUnit: compactionWindowUnit, + } + // Render the template + cqlOutput, err = params.renderTemplate(template) + if err != nil { + return "", "", err + } + cqlOutput, _ = RenderCQLTemplate(params, cqlOutput) + // Print or return the generated CQL + fmt.Println(cqlOutput) + + } + return cqlOutput, "", err +} + +func main() { + builder := &MappingBuilder{} + + schema, _, err := builder.GetSpanServiceMappings() + if err != nil { + fmt.Println("Error:", err) + } + + cluster := gocql.NewCluster("127.0.0.1") + cluster.Keyspace = "jaeger_v1_test" + session, err := cluster.CreateSession() + if err != nil { + fmt.Println(err) + } + defer session.Close() + + queries := strings.Split(schema, ";") + for _, query := range queries { + trimmedQuery := strings.TrimSpace(query) + if trimmedQuery != "" { + fmt.Println(trimmedQuery) + if err := session.Query(trimmedQuery + ";").Exec(); err != nil { + fmt.Println("Failed to create sampling_probabilities table: %v", err) + } else { + fmt.Println("Table sampling_probabilities created successfully.") + } + } + } + +} diff --git a/plugin/storage/cassandra/mappings/test.go b/plugin/storage/cassandra/mappings/test.go new file mode 100644 index 00000000000..8e5eaf16aab --- /dev/null +++ b/plugin/storage/cassandra/mappings/test.go @@ -0,0 +1,24 @@ +package mainlekfe + +import ( + "fmt" + "strings" +) + +func main() { + template := `CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication};` + + // Define the replacements + replacements := map[string]string{ + "${keyspace}": "my_keyspace", + "${replication}": "{'class': 'SimpleStrategy', 'replication_factor': '1'}", + // Add other placeholders as needed + } + + // Replace each placeholder in the template + for placeholder, value := range replacements { + template = strings.ReplaceAll(template, placeholder, value) + } + + fmt.Println(template) +} diff --git a/plugin/storage/cassandra/mappings/v001.cql.tmpl b/plugin/storage/cassandra/mappings/v001.cql.tmpl new file mode 100644 index 00000000000..ff46dbe2059 --- /dev/null +++ b/plugin/storage/cassandra/mappings/v001.cql.tmpl @@ -0,0 +1,207 @@ +-- +-- Creates Cassandra keyspace with tables for traces and dependencies. +-- +-- Required parameters: +-- +-- keyspace +-- name of the keyspace +-- replication +-- replication strategy for the keyspace, such as +-- for prod environments +-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } +-- for test environments +-- {'class': 'SimpleStrategy', 'replication_factor': '1'} +-- trace_ttl +-- default time to live for trace data, in seconds +-- dependencies_ttl +-- default time to live for dependencies data, in seconds (0 for no TTL) +-- +-- Non-configurable settings: +-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ +-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html + +CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; + +CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.log ( + ts bigint, // microseconds since epoch + fields list>, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.process ( + service_name text, + tags list>, +); + +-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. +-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". +-- start_time is bigint instead of timestamp as we require microsecond precision +CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( + trace_id blob, + span_id bigint, + span_hash bigint, + parent_id bigint, + operation_name text, + flags int, + start_time bigint, // microseconds since epoch + duration bigint, // microseconds + tags list>, + logs list>, + refs list>, + process frozen, + PRIMARY KEY (trace_id, span_id, span_hash) +) + WITH compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names ( + service_name text, + operation_name text, + PRIMARY KEY ((service_name), operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- index of trace IDs by service + operation names, sorted by span start_time. +CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( + service_name text, + operation_name text, + start_time bigint, // microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( + service_name text, + bucket int, + start_time bigint, // microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, bucket), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( + service_name text, // service name + operation_name text, // operation name, or blank for queries without span name + bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour + duration bigint, // span duration, in microseconds + start_time bigint, // microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) +) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, // microseconds since epoch + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( + parent text, + child text, + call_count bigint, +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +-- note we have to write ts twice (once as ts_index). This is because we cannot make a SASI index on the primary key +CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies ( + ts timestamp, + ts_index timestamp, + dependencies list>, + PRIMARY KEY (ts) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${dependencies_ttl}; + +CREATE CUSTOM INDEX IF NOT EXISTS ON ${keyspace}.dependencies (ts_index) + USING 'org.apache.cassandra.index.sasi.SASIIndex' + WITH OPTIONS = {'mode': 'SPARSE'}; diff --git a/plugin/storage/cassandra/mappings/v002.cql.tmpl b/plugin/storage/cassandra/mappings/v002.cql.tmpl new file mode 100644 index 00000000000..e463864f4ef --- /dev/null +++ b/plugin/storage/cassandra/mappings/v002.cql.tmpl @@ -0,0 +1,203 @@ +-- +-- Creates Cassandra keyspace with tables for traces and dependencies. +-- +-- Required parameters: +-- +-- keyspace +-- name of the keyspace +-- replication +-- replication strategy for the keyspace, such as +-- for prod environments +-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } +-- for test environments +-- {'class': 'SimpleStrategy', 'replication_factor': '1'} +-- trace_ttl +-- default time to live for trace data, in seconds +-- dependencies_ttl +-- default time to live for dependencies data, in seconds (0 for no TTL) +-- +-- Non-configurable settings: +-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ +-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html + +CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; + +CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.log ( + ts bigint, // microseconds since epoch + fields list>, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.process ( + service_name text, + tags list>, +); + +-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. +-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". +-- start_time is bigint instead of timestamp as we require microsecond precision +CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( + trace_id blob, + span_id bigint, + span_hash bigint, + parent_id bigint, + operation_name text, + flags int, + start_time bigint, // microseconds since epoch + duration bigint, // microseconds + tags list>, + logs list>, + refs list>, + process frozen, + PRIMARY KEY (trace_id, span_id, span_hash) +) + WITH compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names ( + service_name text, + operation_name text, + PRIMARY KEY ((service_name), operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- index of trace IDs by service + operation names, sorted by span start_time. +CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( + service_name text, + operation_name text, + start_time bigint, // microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( + service_name text, + bucket int, + start_time bigint, // microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, bucket), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( + service_name text, // service name + operation_name text, // operation name, or blank for queries without span name + bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour + duration bigint, // span duration, in microseconds + start_time bigint, // microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) +) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, // microseconds since epoch + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( + parent text, + child text, + call_count bigint, + source text, +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${dependencies_ttl}; diff --git a/plugin/storage/cassandra/mappings/v003.cql.tmpl b/plugin/storage/cassandra/mappings/v003.cql.tmpl new file mode 100644 index 00000000000..6ed90e9dee9 --- /dev/null +++ b/plugin/storage/cassandra/mappings/v003.cql.tmpl @@ -0,0 +1,229 @@ +-- +-- Creates Cassandra keyspace with tables for traces and dependencies. +-- +-- Required parameters: +-- +-- keyspace +-- name of the keyspace +-- replication +-- replication strategy for the keyspace, such as +-- for prod environments +-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } +-- for test environments +-- {'class': 'SimpleStrategy', 'replication_factor': '1'} +-- trace_ttl +-- default time to live for trace data, in seconds +-- dependencies_ttl +-- default time to live for dependencies data, in seconds (0 for no TTL) +-- +-- Non-configurable settings: +-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ +-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html + +CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; + +CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.log ( + ts bigint, // microseconds since epoch + fields list>, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.process ( + service_name text, + tags list>, +); + +-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. +-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". +-- start_time is bigint instead of timestamp as we require microsecond precision +CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( + trace_id blob, + span_id bigint, + span_hash bigint, + parent_id bigint, + operation_name text, + flags int, + start_time bigint, // microseconds since epoch + duration bigint, // microseconds + tags list>, + logs list>, + refs list>, + process frozen, + PRIMARY KEY (trace_id, span_id, span_hash) +) + WITH compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names_v2 ( + service_name text, + span_kind text, + operation_name text, + PRIMARY KEY ((service_name), span_kind, operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- index of trace IDs by service + operation names, sorted by span start_time. +CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( + service_name text, + operation_name text, + start_time bigint, // microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( + service_name text, + bucket int, + start_time bigint, // microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, bucket), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( + service_name text, // service name + operation_name text, // operation name, or blank for queries without span name + bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour + duration bigint, // span duration, in microseconds + start_time bigint, // microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) +) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, // microseconds since epoch + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( + parent text, + child text, + call_count bigint, + source text, +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${dependencies_ttl}; + +-- adaptive sampling tables +-- ./plugin/storage/cassandra/samplingstore/storage.go +CREATE TABLE IF NOT EXISTS ${keyspace}.operation_throughput ( + bucket int, + ts timeuuid, + throughput text, + PRIMARY KEY(bucket, ts) +) WITH CLUSTERING ORDER BY (ts desc); + +CREATE TABLE IF NOT EXISTS ${keyspace}.sampling_probabilities ( + bucket int, + ts timeuuid, + hostname text, + probabilities text, + PRIMARY KEY(bucket, ts) +) WITH CLUSTERING ORDER BY (ts desc); + +-- distributed lock +-- ./plugin/pkg/distributedlock/cassandra/lock.go +CREATE TABLE IF NOT EXISTS ${keyspace}.leases ( + name text, + owner text, + PRIMARY KEY (name) +); \ No newline at end of file diff --git a/plugin/storage/cassandra/mappings/v004.cql.tmpl b/plugin/storage/cassandra/mappings/v004.cql.tmpl new file mode 100644 index 00000000000..27f9bc10764 --- /dev/null +++ b/plugin/storage/cassandra/mappings/v004.cql.tmpl @@ -0,0 +1,222 @@ +-- +-- Creates Cassandra keyspace with tables for traces and dependencies. +-- +-- Required parameters: +-- +-- keyspace +-- name of the keyspace +-- replication +-- replication strategy for the keyspace, such as +-- for prod environments +-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } +-- for test environments +-- {'class': 'SimpleStrategy', 'replication_factor': '1'} +-- trace_ttl +-- default time to live for trace data, in seconds +-- dependencies_ttl +-- default time to live for dependencies data, in seconds (0 for no TTL) +-- +-- Non-configurable settings: +-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ +-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html + +CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; + +CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.log ( + ts bigint, -- microseconds since epoch + fields frozen>> +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.process ( + service_name text, + tags frozen>> +); + +-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. +-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". +-- start_time is bigint instead of timestamp as we require microsecond precision +CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( + trace_id blob, + span_id bigint, + span_hash bigint, + parent_id bigint, + operation_name text, + flags int, + start_time bigint, -- microseconds since epoch + duration bigint, -- microseconds + tags list>, + logs list>, + refs list>, + process frozen, + PRIMARY KEY (trace_id, span_id, span_hash) +) + WITH compaction = { + 'compaction_window_size': '${compaction_window_size}', + 'compaction_window_unit': '${compaction_window_unit}', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names_v2 ( + service_name text, + span_kind text, + operation_name text, + PRIMARY KEY ((service_name), span_kind, operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- index of trace IDs by service + operation names, sorted by span start_time. +CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( + service_name text, + operation_name text, + start_time bigint, -- microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( + service_name text, + bucket int, + start_time bigint, -- microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, bucket), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( + service_name text, -- service name + operation_name text, -- operation name, or blank for queries without span name + bucket timestamp, -- time bucket, - the start_time of the given span rounded to an hour + duration bigint, -- span duration, in microseconds + start_time bigint, -- microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) +) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, -- microseconds since epoch + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( + parent text, + child text, + call_count bigint, + source text +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${dependencies_ttl}; + +-- adaptive sampling tables +-- ./plugin/storage/cassandra/samplingstore/storage.go +CREATE TABLE IF NOT EXISTS ${keyspace}.operation_throughput ( + bucket int, + ts timeuuid, + throughput text, + PRIMARY KEY(bucket, ts) +) WITH CLUSTERING ORDER BY (ts desc); + +CREATE TABLE IF NOT EXISTS ${keyspace}.sampling_probabilities ( + bucket int, + ts timeuuid, + hostname text, + probabilities text, + PRIMARY KEY(bucket, ts) +) WITH CLUSTERING ORDER BY (ts desc); + +-- distributed lock +-- ./plugin/pkg/distributedlock/cassandra/lock.go +CREATE TABLE IF NOT EXISTS ${keyspace}.leases ( + name text, + owner text, + PRIMARY KEY (name) +); \ No newline at end of file From 4a17743d251b58ffab3a8d4f382fdba05326bf0e Mon Sep 17 00:00:00 2001 From: mehul Date: Mon, 7 Oct 2024 23:24:55 +0530 Subject: [PATCH 2/4] fix Signed-off-by: mehul --- plugin/storage/cassandra/factory.go | 2 + .../{connection.go => connection.txt} | 0 plugin/storage/cassandra/mappings/mapping.go | 59 +++++++++++++------ plugin/storage/cassandra/mappings/test.go | 24 -------- 4 files changed, 43 insertions(+), 42 deletions(-) rename plugin/storage/cassandra/mappings/{connection.go => connection.txt} (100%) delete mode 100644 plugin/storage/cassandra/mappings/test.go diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index e26c215bac7..79257699b78 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -30,6 +30,7 @@ import ( "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra/mappings" ) const ( @@ -141,6 +142,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) if err != nil { return err } + primarySession.SchemaInit() f.primarySession = primarySession if f.archiveConfig != nil { diff --git a/plugin/storage/cassandra/mappings/connection.go b/plugin/storage/cassandra/mappings/connection.txt similarity index 100% rename from plugin/storage/cassandra/mappings/connection.go rename to plugin/storage/cassandra/mappings/connection.txt diff --git a/plugin/storage/cassandra/mappings/mapping.go b/plugin/storage/cassandra/mappings/mapping.go index eb19e90f9cc..341226ad622 100644 --- a/plugin/storage/cassandra/mappings/mapping.go +++ b/plugin/storage/cassandra/mappings/mapping.go @@ -1,19 +1,21 @@ -package main +package mapping import ( "bytes" "errors" "fmt" - "github.com/gocql/gocql" "os" "regexp" "strconv" "strings" "text/template" + + //"github.com/gocql/gocql" + //"github.com/jaegertracing/jaeger/pkg/cassandra/config" + "github.com/jaegertracing/jaeger/pkg/cassandra" ) type MappingBuilder struct { - cas_version uint Mode string Datacentre string Keyspace string @@ -105,7 +107,7 @@ func getCompactionWindow(traceTTL int, compactionWindow string) (int, string, er } compactionWindowUnit = unitStr } else { - return 0, "", errors.New("invalid compaction window size format. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days.") + return 0, "", errors.New("invalid compaction window size format. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days") } } else { // Calculate default compaction window size and unit @@ -199,33 +201,54 @@ func (mb *MappingBuilder) GetSpanServiceMappings() (spanMapping string, serviceM return cqlOutput, "", err } -func main() { +// func main() { + +// builder := &MappingBuilder{} +// schema, _, err := builder.GetSpanServiceMappings() +// if err != nil { +// fmt.Println("Error:", err) +// } + + +// cluster := gocql.NewCluster("127.0.0.1") +// cluster.Keyspace = "jaeger_v1_test" +// session, err := cluster.CreateSession() +// if err != nil { +// fmt.Println(err) +// } +// defer session.Close() + +// queries := strings.Split(schema, ";") +// for _, query := range queries { +// trimmedQuery := strings.TrimSpace(query) +// if trimmedQuery != "" { +// fmt.Println(trimmedQuery) +// if err := session.Query(trimmedQuery + ";").Exec(); err != nil { +// fmt.Println("Failed to create sampling_probabilities table: %v", err) +// } else { +// fmt.Println("Table sampling_probabilities created successfully.") +// } +// } +// } + +// } + +func (c cassandra.Session) SchemaInit() { builder := &MappingBuilder{} - schema, _, err := builder.GetSpanServiceMappings() if err != nil { fmt.Println("Error:", err) } - - cluster := gocql.NewCluster("127.0.0.1") - cluster.Keyspace = "jaeger_v1_test" - session, err := cluster.CreateSession() - if err != nil { - fmt.Println(err) - } - defer session.Close() - queries := strings.Split(schema, ";") for _, query := range queries { trimmedQuery := strings.TrimSpace(query) if trimmedQuery != "" { fmt.Println(trimmedQuery) - if err := session.Query(trimmedQuery + ";").Exec(); err != nil { + if err := c.Query(trimmedQuery + ";").Exec(); err != nil { fmt.Println("Failed to create sampling_probabilities table: %v", err) } else { fmt.Println("Table sampling_probabilities created successfully.") } } } - -} +} \ No newline at end of file diff --git a/plugin/storage/cassandra/mappings/test.go b/plugin/storage/cassandra/mappings/test.go deleted file mode 100644 index 8e5eaf16aab..00000000000 --- a/plugin/storage/cassandra/mappings/test.go +++ /dev/null @@ -1,24 +0,0 @@ -package mainlekfe - -import ( - "fmt" - "strings" -) - -func main() { - template := `CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication};` - - // Define the replacements - replacements := map[string]string{ - "${keyspace}": "my_keyspace", - "${replication}": "{'class': 'SimpleStrategy', 'replication_factor': '1'}", - // Add other placeholders as needed - } - - // Replace each placeholder in the template - for placeholder, value := range replacements { - template = strings.ReplaceAll(template, placeholder, value) - } - - fmt.Println(template) -} From 51922fcd12d01213bac76e9b09ee95d67b7b03a8 Mon Sep 17 00:00:00 2001 From: mehul Date: Tue, 8 Oct 2024 01:43:54 +0530 Subject: [PATCH 3/4] added schema init in config.go Signed-off-by: mehul --- pkg/cassandra/config/config.go | 2 + plugin/storage/cassandra/factory.go | 3 +- .../storage/cassandra/mappings/connection.txt | 35 --- .../storage/cassandra/mappings/v001.cql.tmpl | 207 ---------------- .../storage/cassandra/mappings/v002.cql.tmpl | 203 ---------------- .../storage/cassandra/mappings/v003.cql.tmpl | 229 ------------------ .../storage/cassandra/mappings/v004.cql.tmpl | 222 ----------------- .../{mappings/mapping.go => schema/create.go} | 55 +---- 8 files changed, 10 insertions(+), 946 deletions(-) delete mode 100644 plugin/storage/cassandra/mappings/connection.txt delete mode 100644 plugin/storage/cassandra/mappings/v001.cql.tmpl delete mode 100644 plugin/storage/cassandra/mappings/v002.cql.tmpl delete mode 100644 plugin/storage/cassandra/mappings/v003.cql.tmpl delete mode 100644 plugin/storage/cassandra/mappings/v004.cql.tmpl rename plugin/storage/cassandra/{mappings/mapping.go => schema/create.go} (81%) diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 6bab9c75da4..f0b5bf51a36 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -15,6 +15,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/cassandra" gocqlw "github.com/jaegertracing/jaeger/pkg/cassandra/gocql" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra/schema" ) // Configuration describes the configuration properties needed to connect to a Cassandra cluster. @@ -144,6 +145,7 @@ func (c *Configuration) NewSession() (cassandra.Session, error) { if err != nil { return nil, err } + mappings.SchemaInit(session) return gocqlw.WrapCQLSession(session), nil } diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 79257699b78..9809f736fd4 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -30,7 +30,6 @@ import ( "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/samplingstore" "github.com/jaegertracing/jaeger/storage/spanstore" - "github.com/jaegertracing/jaeger/plugin/storage/cassandra/mappings" ) const ( @@ -142,7 +141,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) if err != nil { return err } - primarySession.SchemaInit() + f.primarySession = primarySession if f.archiveConfig != nil { diff --git a/plugin/storage/cassandra/mappings/connection.txt b/plugin/storage/cassandra/mappings/connection.txt deleted file mode 100644 index 474b17ae6ff..00000000000 --- a/plugin/storage/cassandra/mappings/connection.txt +++ /dev/null @@ -1,35 +0,0 @@ -package main2fefwsef - -import ( - "fmt" - "log" - - "github.com/gocql/gocql" -) - -func main() { - // Replace with your Cassandra contact points - cluster := gocql.NewCluster("127.0.0.1") - cluster.Keyspace = "jaeger_v1_test" // Optional: Set default keyspace - session, err := cluster.CreateSession() - if err != nil { - log.Fatal(err) - } - defer session.Close() - - // Your schema creation logic goes here - createSamplingProbabilities := ` - CREATE TABLE IF NOT EXISTS jaeger_v1_test.sampling_probabilities ( - bucket int, - ts timeuuid, - hostname text, - probabilities text, - PRIMARY KEY(bucket, ts) - ) WITH CLUSTERING ORDER BY (ts desc);` - if err := session.Query(createSamplingProbabilities).Exec(); err != nil { - log.Fatalf("Failed to create sampling_probabilities table: %v", err) - } else { - fmt.Println("Table sampling_probabilities created successfully.") - } - -} diff --git a/plugin/storage/cassandra/mappings/v001.cql.tmpl b/plugin/storage/cassandra/mappings/v001.cql.tmpl deleted file mode 100644 index ff46dbe2059..00000000000 --- a/plugin/storage/cassandra/mappings/v001.cql.tmpl +++ /dev/null @@ -1,207 +0,0 @@ --- --- Creates Cassandra keyspace with tables for traces and dependencies. --- --- Required parameters: --- --- keyspace --- name of the keyspace --- replication --- replication strategy for the keyspace, such as --- for prod environments --- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } --- for test environments --- {'class': 'SimpleStrategy', 'replication_factor': '1'} --- trace_ttl --- default time to live for trace data, in seconds --- dependencies_ttl --- default time to live for dependencies data, in seconds (0 for no TTL) --- --- Non-configurable settings: --- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ --- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html - -CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; - -CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( - key text, - value_type text, - value_string text, - value_bool boolean, - value_long bigint, - value_double double, - value_binary blob, -); - -CREATE TYPE IF NOT EXISTS ${keyspace}.log ( - ts bigint, // microseconds since epoch - fields list>, -); - -CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( - ref_type text, - trace_id blob, - span_id bigint, -); - -CREATE TYPE IF NOT EXISTS ${keyspace}.process ( - service_name text, - tags list>, -); - --- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. --- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". --- start_time is bigint instead of timestamp as we require microsecond precision -CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( - trace_id blob, - span_id bigint, - span_hash bigint, - parent_id bigint, - operation_name text, - flags int, - start_time bigint, // microseconds since epoch - duration bigint, // microseconds - tags list>, - logs list>, - refs list>, - process frozen, - PRIMARY KEY (trace_id, span_id, span_hash) -) - WITH compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( - service_name text, - PRIMARY KEY (service_name) -) - WITH compaction = { - 'min_threshold': '4', - 'max_threshold': '32', - 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names ( - service_name text, - operation_name text, - PRIMARY KEY ((service_name), operation_name) -) - WITH compaction = { - 'min_threshold': '4', - 'max_threshold': '32', - 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - --- index of trace IDs by service + operation names, sorted by span start_time. -CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( - service_name text, - operation_name text, - start_time bigint, // microseconds since epoch - trace_id blob, - PRIMARY KEY ((service_name, operation_name), start_time) -) WITH CLUSTERING ORDER BY (start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( - service_name text, - bucket int, - start_time bigint, // microseconds since epoch - trace_id blob, - PRIMARY KEY ((service_name, bucket), start_time) -) WITH CLUSTERING ORDER BY (start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( - service_name text, // service name - operation_name text, // operation name, or blank for queries without span name - bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour - duration bigint, // span duration, in microseconds - start_time bigint, // microseconds since epoch - trace_id blob, - PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) -) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - --- a bucketing strategy may have to be added for tag queries --- we can make this table even better by adding a timestamp to it -CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( - service_name text, - tag_key text, - tag_value text, - start_time bigint, // microseconds since epoch - trace_id blob, - span_id bigint, - PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) -) - WITH CLUSTERING ORDER BY (start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( - parent text, - child text, - call_count bigint, -); - --- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data --- note we have to write ts twice (once as ts_index). This is because we cannot make a SASI index on the primary key -CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies ( - ts timestamp, - ts_index timestamp, - dependencies list>, - PRIMARY KEY (ts) -) - WITH compaction = { - 'min_threshold': '4', - 'max_threshold': '32', - 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' - } - AND default_time_to_live = ${dependencies_ttl}; - -CREATE CUSTOM INDEX IF NOT EXISTS ON ${keyspace}.dependencies (ts_index) - USING 'org.apache.cassandra.index.sasi.SASIIndex' - WITH OPTIONS = {'mode': 'SPARSE'}; diff --git a/plugin/storage/cassandra/mappings/v002.cql.tmpl b/plugin/storage/cassandra/mappings/v002.cql.tmpl deleted file mode 100644 index e463864f4ef..00000000000 --- a/plugin/storage/cassandra/mappings/v002.cql.tmpl +++ /dev/null @@ -1,203 +0,0 @@ --- --- Creates Cassandra keyspace with tables for traces and dependencies. --- --- Required parameters: --- --- keyspace --- name of the keyspace --- replication --- replication strategy for the keyspace, such as --- for prod environments --- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } --- for test environments --- {'class': 'SimpleStrategy', 'replication_factor': '1'} --- trace_ttl --- default time to live for trace data, in seconds --- dependencies_ttl --- default time to live for dependencies data, in seconds (0 for no TTL) --- --- Non-configurable settings: --- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ --- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html - -CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; - -CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( - key text, - value_type text, - value_string text, - value_bool boolean, - value_long bigint, - value_double double, - value_binary blob, -); - -CREATE TYPE IF NOT EXISTS ${keyspace}.log ( - ts bigint, // microseconds since epoch - fields list>, -); - -CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( - ref_type text, - trace_id blob, - span_id bigint, -); - -CREATE TYPE IF NOT EXISTS ${keyspace}.process ( - service_name text, - tags list>, -); - --- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. --- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". --- start_time is bigint instead of timestamp as we require microsecond precision -CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( - trace_id blob, - span_id bigint, - span_hash bigint, - parent_id bigint, - operation_name text, - flags int, - start_time bigint, // microseconds since epoch - duration bigint, // microseconds - tags list>, - logs list>, - refs list>, - process frozen, - PRIMARY KEY (trace_id, span_id, span_hash) -) - WITH compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( - service_name text, - PRIMARY KEY (service_name) -) - WITH compaction = { - 'min_threshold': '4', - 'max_threshold': '32', - 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names ( - service_name text, - operation_name text, - PRIMARY KEY ((service_name), operation_name) -) - WITH compaction = { - 'min_threshold': '4', - 'max_threshold': '32', - 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - --- index of trace IDs by service + operation names, sorted by span start_time. -CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( - service_name text, - operation_name text, - start_time bigint, // microseconds since epoch - trace_id blob, - PRIMARY KEY ((service_name, operation_name), start_time) -) WITH CLUSTERING ORDER BY (start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( - service_name text, - bucket int, - start_time bigint, // microseconds since epoch - trace_id blob, - PRIMARY KEY ((service_name, bucket), start_time) -) WITH CLUSTERING ORDER BY (start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( - service_name text, // service name - operation_name text, // operation name, or blank for queries without span name - bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour - duration bigint, // span duration, in microseconds - start_time bigint, // microseconds since epoch - trace_id blob, - PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) -) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - --- a bucketing strategy may have to be added for tag queries --- we can make this table even better by adding a timestamp to it -CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( - service_name text, - tag_key text, - tag_value text, - start_time bigint, // microseconds since epoch - trace_id blob, - span_id bigint, - PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) -) - WITH CLUSTERING ORDER BY (start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( - parent text, - child text, - call_count bigint, - source text, -); - --- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data -CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 ( - ts_bucket timestamp, - ts timestamp, - dependencies list>, - PRIMARY KEY (ts_bucket, ts) -) WITH CLUSTERING ORDER BY (ts DESC) - AND compaction = { - 'min_threshold': '4', - 'max_threshold': '32', - 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' - } - AND default_time_to_live = ${dependencies_ttl}; diff --git a/plugin/storage/cassandra/mappings/v003.cql.tmpl b/plugin/storage/cassandra/mappings/v003.cql.tmpl deleted file mode 100644 index 6ed90e9dee9..00000000000 --- a/plugin/storage/cassandra/mappings/v003.cql.tmpl +++ /dev/null @@ -1,229 +0,0 @@ --- --- Creates Cassandra keyspace with tables for traces and dependencies. --- --- Required parameters: --- --- keyspace --- name of the keyspace --- replication --- replication strategy for the keyspace, such as --- for prod environments --- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } --- for test environments --- {'class': 'SimpleStrategy', 'replication_factor': '1'} --- trace_ttl --- default time to live for trace data, in seconds --- dependencies_ttl --- default time to live for dependencies data, in seconds (0 for no TTL) --- --- Non-configurable settings: --- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ --- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html - -CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; - -CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( - key text, - value_type text, - value_string text, - value_bool boolean, - value_long bigint, - value_double double, - value_binary blob, -); - -CREATE TYPE IF NOT EXISTS ${keyspace}.log ( - ts bigint, // microseconds since epoch - fields list>, -); - -CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( - ref_type text, - trace_id blob, - span_id bigint, -); - -CREATE TYPE IF NOT EXISTS ${keyspace}.process ( - service_name text, - tags list>, -); - --- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. --- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". --- start_time is bigint instead of timestamp as we require microsecond precision -CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( - trace_id blob, - span_id bigint, - span_hash bigint, - parent_id bigint, - operation_name text, - flags int, - start_time bigint, // microseconds since epoch - duration bigint, // microseconds - tags list>, - logs list>, - refs list>, - process frozen, - PRIMARY KEY (trace_id, span_id, span_hash) -) - WITH compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( - service_name text, - PRIMARY KEY (service_name) -) - WITH compaction = { - 'min_threshold': '4', - 'max_threshold': '32', - 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names_v2 ( - service_name text, - span_kind text, - operation_name text, - PRIMARY KEY ((service_name), span_kind, operation_name) -) - WITH compaction = { - 'min_threshold': '4', - 'max_threshold': '32', - 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - --- index of trace IDs by service + operation names, sorted by span start_time. -CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( - service_name text, - operation_name text, - start_time bigint, // microseconds since epoch - trace_id blob, - PRIMARY KEY ((service_name, operation_name), start_time) -) WITH CLUSTERING ORDER BY (start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( - service_name text, - bucket int, - start_time bigint, // microseconds since epoch - trace_id blob, - PRIMARY KEY ((service_name, bucket), start_time) -) WITH CLUSTERING ORDER BY (start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( - service_name text, // service name - operation_name text, // operation name, or blank for queries without span name - bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour - duration bigint, // span duration, in microseconds - start_time bigint, // microseconds since epoch - trace_id blob, - PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) -) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - --- a bucketing strategy may have to be added for tag queries --- we can make this table even better by adding a timestamp to it -CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( - service_name text, - tag_key text, - tag_value text, - start_time bigint, // microseconds since epoch - trace_id blob, - span_id bigint, - PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) -) - WITH CLUSTERING ORDER BY (start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND dclocal_read_repair_chance = 0.0 - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( - parent text, - child text, - call_count bigint, - source text, -); - --- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data -CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 ( - ts_bucket timestamp, - ts timestamp, - dependencies list>, - PRIMARY KEY (ts_bucket, ts) -) WITH CLUSTERING ORDER BY (ts DESC) - AND compaction = { - 'min_threshold': '4', - 'max_threshold': '32', - 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' - } - AND default_time_to_live = ${dependencies_ttl}; - --- adaptive sampling tables --- ./plugin/storage/cassandra/samplingstore/storage.go -CREATE TABLE IF NOT EXISTS ${keyspace}.operation_throughput ( - bucket int, - ts timeuuid, - throughput text, - PRIMARY KEY(bucket, ts) -) WITH CLUSTERING ORDER BY (ts desc); - -CREATE TABLE IF NOT EXISTS ${keyspace}.sampling_probabilities ( - bucket int, - ts timeuuid, - hostname text, - probabilities text, - PRIMARY KEY(bucket, ts) -) WITH CLUSTERING ORDER BY (ts desc); - --- distributed lock --- ./plugin/pkg/distributedlock/cassandra/lock.go -CREATE TABLE IF NOT EXISTS ${keyspace}.leases ( - name text, - owner text, - PRIMARY KEY (name) -); \ No newline at end of file diff --git a/plugin/storage/cassandra/mappings/v004.cql.tmpl b/plugin/storage/cassandra/mappings/v004.cql.tmpl deleted file mode 100644 index 27f9bc10764..00000000000 --- a/plugin/storage/cassandra/mappings/v004.cql.tmpl +++ /dev/null @@ -1,222 +0,0 @@ --- --- Creates Cassandra keyspace with tables for traces and dependencies. --- --- Required parameters: --- --- keyspace --- name of the keyspace --- replication --- replication strategy for the keyspace, such as --- for prod environments --- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } --- for test environments --- {'class': 'SimpleStrategy', 'replication_factor': '1'} --- trace_ttl --- default time to live for trace data, in seconds --- dependencies_ttl --- default time to live for dependencies data, in seconds (0 for no TTL) --- --- Non-configurable settings: --- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ --- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html - -CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; - -CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( - key text, - value_type text, - value_string text, - value_bool boolean, - value_long bigint, - value_double double, - value_binary blob -); - -CREATE TYPE IF NOT EXISTS ${keyspace}.log ( - ts bigint, -- microseconds since epoch - fields frozen>> -); - -CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( - ref_type text, - trace_id blob, - span_id bigint -); - -CREATE TYPE IF NOT EXISTS ${keyspace}.process ( - service_name text, - tags frozen>> -); - --- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. --- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". --- start_time is bigint instead of timestamp as we require microsecond precision -CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( - trace_id blob, - span_id bigint, - span_hash bigint, - parent_id bigint, - operation_name text, - flags int, - start_time bigint, -- microseconds since epoch - duration bigint, -- microseconds - tags list>, - logs list>, - refs list>, - process frozen, - PRIMARY KEY (trace_id, span_id, span_hash) -) - WITH compaction = { - 'compaction_window_size': '${compaction_window_size}', - 'compaction_window_unit': '${compaction_window_unit}', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( - service_name text, - PRIMARY KEY (service_name) -) - WITH compaction = { - 'min_threshold': '4', - 'max_threshold': '32', - 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' - } - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names_v2 ( - service_name text, - span_kind text, - operation_name text, - PRIMARY KEY ((service_name), span_kind, operation_name) -) - WITH compaction = { - 'min_threshold': '4', - 'max_threshold': '32', - 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' - } - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - --- index of trace IDs by service + operation names, sorted by span start_time. -CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( - service_name text, - operation_name text, - start_time bigint, -- microseconds since epoch - trace_id blob, - PRIMARY KEY ((service_name, operation_name), start_time) -) WITH CLUSTERING ORDER BY (start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( - service_name text, - bucket int, - start_time bigint, -- microseconds since epoch - trace_id blob, - PRIMARY KEY ((service_name, bucket), start_time) -) WITH CLUSTERING ORDER BY (start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( - service_name text, -- service name - operation_name text, -- operation name, or blank for queries without span name - bucket timestamp, -- time bucket, - the start_time of the given span rounded to an hour - duration bigint, -- span duration, in microseconds - start_time bigint, -- microseconds since epoch - trace_id blob, - PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) -) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - --- a bucketing strategy may have to be added for tag queries --- we can make this table even better by adding a timestamp to it -CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( - service_name text, - tag_key text, - tag_value text, - start_time bigint, -- microseconds since epoch - trace_id blob, - span_id bigint, - PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) -) - WITH CLUSTERING ORDER BY (start_time DESC) - AND compaction = { - 'compaction_window_size': '1', - 'compaction_window_unit': 'HOURS', - 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' - } - AND default_time_to_live = ${trace_ttl} - AND speculative_retry = 'NONE' - AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes - -CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( - parent text, - child text, - call_count bigint, - source text -); - --- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data -CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 ( - ts_bucket timestamp, - ts timestamp, - dependencies list>, - PRIMARY KEY (ts_bucket, ts) -) WITH CLUSTERING ORDER BY (ts DESC) - AND compaction = { - 'min_threshold': '4', - 'max_threshold': '32', - 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' - } - AND default_time_to_live = ${dependencies_ttl}; - --- adaptive sampling tables --- ./plugin/storage/cassandra/samplingstore/storage.go -CREATE TABLE IF NOT EXISTS ${keyspace}.operation_throughput ( - bucket int, - ts timeuuid, - throughput text, - PRIMARY KEY(bucket, ts) -) WITH CLUSTERING ORDER BY (ts desc); - -CREATE TABLE IF NOT EXISTS ${keyspace}.sampling_probabilities ( - bucket int, - ts timeuuid, - hostname text, - probabilities text, - PRIMARY KEY(bucket, ts) -) WITH CLUSTERING ORDER BY (ts desc); - --- distributed lock --- ./plugin/pkg/distributedlock/cassandra/lock.go -CREATE TABLE IF NOT EXISTS ${keyspace}.leases ( - name text, - owner text, - PRIMARY KEY (name) -); \ No newline at end of file diff --git a/plugin/storage/cassandra/mappings/mapping.go b/plugin/storage/cassandra/schema/create.go similarity index 81% rename from plugin/storage/cassandra/mappings/mapping.go rename to plugin/storage/cassandra/schema/create.go index 341226ad622..8abee2951fe 100644 --- a/plugin/storage/cassandra/mappings/mapping.go +++ b/plugin/storage/cassandra/schema/create.go @@ -1,4 +1,4 @@ -package mapping +package mappings import ( "bytes" @@ -9,10 +9,8 @@ import ( "strconv" "strings" "text/template" - - //"github.com/gocql/gocql" - //"github.com/jaegertracing/jaeger/pkg/cassandra/config" - "github.com/jaegertracing/jaeger/pkg/cassandra" + //"github.com/jaegertracing/jaeger/pkg/cassandra" + "github.com/gocql/gocql" ) type MappingBuilder struct { @@ -89,18 +87,15 @@ func getCompactionWindow(traceTTL int, compactionWindow string) (int, string, er var compactionWindowUnit string if compactionWindow != "" { - // Check the format of the compaction window matched, err := regexp.MatchString(`^[0-9]+[mhd]$`, compactionWindow) if err != nil { return 0, "", err } if matched { - // Extract size and unit numStr := strings.TrimRight(compactionWindow, "mhd") unitStr := strings.TrimLeft(compactionWindow, numStr) - // Convert size to integer compactionWindowSize, err = strconv.Atoi(numStr) if err != nil { return 0, "", errors.New("invalid compaction window size format") @@ -110,13 +105,11 @@ func getCompactionWindow(traceTTL int, compactionWindow string) (int, string, er return 0, "", errors.New("invalid compaction window size format. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days") } } else { - // Calculate default compaction window size and unit traceTTLMinutes := traceTTL / 60 compactionWindowSize = (traceTTLMinutes + 30 - 1) / 30 compactionWindowUnit = "m" } - // Map the unit switch compactionWindowUnit { case "m": compactionWindowUnit = "MINUTES" @@ -194,46 +187,12 @@ func (mb *MappingBuilder) GetSpanServiceMappings() (spanMapping string, serviceM return "", "", err } cqlOutput, _ = RenderCQLTemplate(params, cqlOutput) - // Print or return the generated CQL - fmt.Println(cqlOutput) - + //fmt.Println(cqlOutput) } return cqlOutput, "", err } -// func main() { - -// builder := &MappingBuilder{} -// schema, _, err := builder.GetSpanServiceMappings() -// if err != nil { -// fmt.Println("Error:", err) -// } - - -// cluster := gocql.NewCluster("127.0.0.1") -// cluster.Keyspace = "jaeger_v1_test" -// session, err := cluster.CreateSession() -// if err != nil { -// fmt.Println(err) -// } -// defer session.Close() - -// queries := strings.Split(schema, ";") -// for _, query := range queries { -// trimmedQuery := strings.TrimSpace(query) -// if trimmedQuery != "" { -// fmt.Println(trimmedQuery) -// if err := session.Query(trimmedQuery + ";").Exec(); err != nil { -// fmt.Println("Failed to create sampling_probabilities table: %v", err) -// } else { -// fmt.Println("Table sampling_probabilities created successfully.") -// } -// } -// } - -// } - -func (c cassandra.Session) SchemaInit() { +func SchemaInit(c *gocql.Session) { builder := &MappingBuilder{} schema, _, err := builder.GetSpanServiceMappings() if err != nil { @@ -245,10 +204,10 @@ func (c cassandra.Session) SchemaInit() { if trimmedQuery != "" { fmt.Println(trimmedQuery) if err := c.Query(trimmedQuery + ";").Exec(); err != nil { - fmt.Println("Failed to create sampling_probabilities table: %v", err) + fmt.Printf("Failed to create sampling_probabilities table: %v", err) } else { fmt.Println("Table sampling_probabilities created successfully.") } } } -} \ No newline at end of file +} From 7c05fdcbb97232bfd5ed4c4cc012c5d36f21ec24 Mon Sep 17 00:00:00 2001 From: mehul Date: Tue, 8 Oct 2024 02:02:18 +0530 Subject: [PATCH 4/4] added test Signed-off-by: mehul --- plugin/storage/cassandra/factory.go | 1 - plugin/storage/cassandra/schema/create.go | 13 +- .../storage/cassandra/schema/create_test.go | 113 ++++++++++++++++++ 3 files changed, 121 insertions(+), 6 deletions(-) create mode 100644 plugin/storage/cassandra/schema/create_test.go diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index 9809f736fd4..e26c215bac7 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -141,7 +141,6 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) if err != nil { return err } - f.primarySession = primarySession if f.archiveConfig != nil { diff --git a/plugin/storage/cassandra/schema/create.go b/plugin/storage/cassandra/schema/create.go index 8abee2951fe..72b412f8299 100644 --- a/plugin/storage/cassandra/schema/create.go +++ b/plugin/storage/cassandra/schema/create.go @@ -1,3 +1,6 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + package mappings import ( @@ -9,8 +12,9 @@ import ( "strconv" "strings" "text/template" - //"github.com/jaegertracing/jaeger/pkg/cassandra" + "github.com/gocql/gocql" + //"github.com/jaegertracing/jaeger/pkg/cassandra" ) type MappingBuilder struct { @@ -39,7 +43,6 @@ func (mb *MappingBuilder) renderTemplate(templatePath string) (string, error) { } func RenderCQLTemplate(params MappingBuilder, cqlOutput string) (string, error) { - commentRegex := regexp.MustCompile(`--.*`) cqlOutput = commentRegex.ReplaceAllString(cqlOutput, "") @@ -128,7 +131,7 @@ func (mb *MappingBuilder) GetSpanServiceMappings() (spanMapping string, serviceM traceTTL, _ := strconv.Atoi(getEnv("TRACE_TTL", "172800")) dependenciesTTL, _ := strconv.Atoi(getEnv("DEPENDENCIES_TTL", "0")) cas_version := getEnv("VERSION", "4") - //template := os.Args[1] + // template := os.Args[1] var template string var cqlOutput string if template == "" { @@ -147,7 +150,7 @@ func (mb *MappingBuilder) GetSpanServiceMappings() (spanMapping string, serviceM return } var datacentre, replications string - //var ReplicationFactor int + // var ReplicationFactor int if mode == "prod" { datacentre = getEnv("DATACENTRE", "") if datacentre == "" { @@ -187,7 +190,7 @@ func (mb *MappingBuilder) GetSpanServiceMappings() (spanMapping string, serviceM return "", "", err } cqlOutput, _ = RenderCQLTemplate(params, cqlOutput) - //fmt.Println(cqlOutput) + // fmt.Println(cqlOutput) } return cqlOutput, "", err } diff --git a/plugin/storage/cassandra/schema/create_test.go b/plugin/storage/cassandra/schema/create_test.go new file mode 100644 index 00000000000..43081e4a08e --- /dev/null +++ b/plugin/storage/cassandra/schema/create_test.go @@ -0,0 +1,113 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package mappings + +import ( + "os" + "strings" + "testing" + + "github.com/stretchr/testify/assert" +) + +// func TestRenderTemplate(t *testing.T) { +// mb := &MappingBuilder{ +// Mode: "test", +// Datacentre: "dc1", +// Keyspace: "test_keyspace", +// Replication: "{'class': 'SimpleStrategy', 'replication_factor': '1'}", +// TraceTTL: 172800, +// DependenciesTTL: 0, +// CompactionWindowSize: 30, +// CompactionWindowUnit: "MINUTES", +// } + +// // Create a temporary template file +// templateContent := "CREATE KEYSPACE ${keyspace} WITH replication = ${replication};" +// tmpFile, err := os.CreateTemp("", "template.cql.tmpl") +// assert.NoError(t, err) +// defer os.Remove(tmpFile.Name()) + +// _, err = tmpFile.WriteString(templateContent) +// assert.NoError(t, err) +// tmpFile.Close() + +// renderedOutput, err := mb.renderTemplate(tmpFile.Name()) +// assert.NoError(t, err) +// expectedOutput := "CREATE KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};" +// assert.Equal(t, expectedOutput, renderedOutput) +// } + +func TestRenderCQLTemplate(t *testing.T) { + params := MappingBuilder{ + Keyspace: "test_keyspace", + Replication: "{'class': 'SimpleStrategy', 'replication_factor': '1'}", + TraceTTL: 172800, + DependenciesTTL: 0, + CompactionWindowSize: 30, + CompactionWindowUnit: "MINUTES", + } + + cqlOutput := ` + CREATE KEYSPACE ${keyspace} WITH replication = ${replication}; + CREATE TABLE ${keyspace}.traces (trace_id UUID PRIMARY KEY, span_id UUID, trace_ttl int); + ` + + renderedOutput, err := RenderCQLTemplate(params, cqlOutput) + assert.NoError(t, err) + expectedOutput := ` + CREATE KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; + CREATE TABLE test_keyspace.traces (trace_id UUID PRIMARY KEY, span_id UUID, trace_ttl int); + ` + assert.Equal(t, strings.TrimSpace(expectedOutput), strings.TrimSpace(renderedOutput)) +} + +func TestGetCompactionWindow(t *testing.T) { + tests := []struct { + traceTTL int + compactionWindow string + expectedSize int + expectedUnit string + expectError bool + }{ + {172800, "30m", 30, "MINUTES", false}, + {172800, "2h", 2, "HOURS", false}, + {172800, "1d", 1, "DAYS", false}, + {172800, "", 96, "MINUTES", false}, + {172800, "invalid", 0, "", true}, + } + + for _, test := range tests { + size, unit, err := getCompactionWindow(test.traceTTL, test.compactionWindow) + if test.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expectedSize, size) + assert.Equal(t, test.expectedUnit, unit) + } + } +} + +func TestIsValidKeyspace(t *testing.T) { + assert.True(t, isValidKeyspace("valid_keyspace")) + assert.False(t, isValidKeyspace("invalid-keyspace")) + assert.False(t, isValidKeyspace("invalid keyspace")) +} + +func TestGetSpanServiceMappings(t *testing.T) { + os.Setenv("TRACE_TTL", "172800") + os.Setenv("DEPENDENCIES_TTL", "0") + os.Setenv("VERSION", "4") + os.Setenv("MODE", "test") + os.Setenv("DATACENTRE", "dc1") + os.Setenv("REPLICATION_FACTOR", "1") + os.Setenv("KEYSPACE", "test_keyspace") + + builder := &MappingBuilder{} + spanMapping, serviceMapping, err := builder.GetSpanServiceMappings() + assert.NoError(t, err) + assert.NotEmpty(t, spanMapping) + assert.Empty(t, serviceMapping) +}