-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cassandra init #6072
base: main
Are you sure you want to change the base?
Cassandra init #6072
Changes from 5 commits
d0a3ed3
4a17743
51922fc
09eb418
7c05fdc
3aa0941
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,216 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package mappings | ||
|
||
import ( | ||
"bytes" | ||
"errors" | ||
"fmt" | ||
"os" | ||
"regexp" | ||
"strconv" | ||
"strings" | ||
"text/template" | ||
|
||
"github.com/gocql/gocql" | ||
//"github.com/jaegertracing/jaeger/pkg/cassandra" | ||
) | ||
|
||
type MappingBuilder struct { | ||
Mode string | ||
Datacentre string | ||
Keyspace string | ||
Replication string | ||
TraceTTL int | ||
DependenciesTTL int | ||
CompactionWindowSize int | ||
CompactionWindowUnit string | ||
} | ||
|
||
func (mb *MappingBuilder) renderTemplate(templatePath string) (string, error) { | ||
tmpl, err := template.ParseFiles(templatePath) | ||
if err != nil { | ||
return "", err | ||
} | ||
|
||
var renderedOutput bytes.Buffer | ||
if err := tmpl.Execute(&renderedOutput, mb); err != nil { | ||
return "", err | ||
} | ||
|
||
return renderedOutput.String(), nil | ||
} | ||
|
||
func RenderCQLTemplate(params MappingBuilder, cqlOutput string) (string, error) { | ||
commentRegex := regexp.MustCompile(`--.*`) | ||
cqlOutput = commentRegex.ReplaceAllString(cqlOutput, "") | ||
|
||
lines := strings.Split(cqlOutput, "\n") | ||
var filteredLines []string | ||
|
||
for _, line := range lines { | ||
if strings.TrimSpace(line) != "" { | ||
filteredLines = append(filteredLines, line) | ||
} | ||
} | ||
|
||
cqlOutput = strings.Join(filteredLines, "\n") | ||
|
||
replacements := map[string]string{ | ||
"${keyspace}": params.Keyspace, | ||
"${replication}": params.Replication, | ||
"${trace_ttl}": fmt.Sprintf("%d", params.TraceTTL), | ||
"${dependencies_ttl}": fmt.Sprintf("%d", params.DependenciesTTL), | ||
"${compaction_window_size}": fmt.Sprintf("%d", params.CompactionWindowSize), | ||
"${compaction_window_unit}": params.CompactionWindowUnit, | ||
} | ||
|
||
for placeholder, value := range replacements { | ||
cqlOutput = strings.ReplaceAll(cqlOutput, placeholder, value) | ||
} | ||
|
||
return cqlOutput, nil | ||
} | ||
|
||
func getEnv(key, defaultValue string) string { | ||
if value, exists := os.LookupEnv(key); exists { | ||
return value | ||
} | ||
return defaultValue | ||
} | ||
|
||
func isValidKeyspace(keyspace string) bool { | ||
match, _ := regexp.MatchString(`^[a-zA-Z0-9_]+$`, keyspace) | ||
return match | ||
} | ||
|
||
func getCompactionWindow(traceTTL int, compactionWindow string) (int, string, error) { | ||
var compactionWindowSize int | ||
var compactionWindowUnit string | ||
|
||
if compactionWindow != "" { | ||
matched, err := regexp.MatchString(`^[0-9]+[mhd]$`, compactionWindow) | ||
if err != nil { | ||
return 0, "", err | ||
} | ||
|
||
if matched { | ||
numStr := strings.TrimRight(compactionWindow, "mhd") | ||
unitStr := strings.TrimLeft(compactionWindow, numStr) | ||
|
||
compactionWindowSize, err = strconv.Atoi(numStr) | ||
if err != nil { | ||
return 0, "", errors.New("invalid compaction window size format") | ||
} | ||
compactionWindowUnit = unitStr | ||
} else { | ||
return 0, "", errors.New("invalid compaction window size format. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days") | ||
} | ||
} else { | ||
traceTTLMinutes := traceTTL / 60 | ||
compactionWindowSize = (traceTTLMinutes + 30 - 1) / 30 | ||
compactionWindowUnit = "m" | ||
} | ||
|
||
switch compactionWindowUnit { | ||
case "m": | ||
compactionWindowUnit = "MINUTES" | ||
case "h": | ||
compactionWindowUnit = "HOURS" | ||
case "d": | ||
compactionWindowUnit = "DAYS" | ||
default: | ||
return 0, "", errors.New("invalid compaction window unit") | ||
} | ||
|
||
return compactionWindowSize, compactionWindowUnit, nil | ||
} | ||
|
||
func (mb *MappingBuilder) GetSpanServiceMappings() (spanMapping string, serviceMapping string, err error) { | ||
traceTTL, _ := strconv.Atoi(getEnv("TRACE_TTL", "172800")) | ||
dependenciesTTL, _ := strconv.Atoi(getEnv("DEPENDENCIES_TTL", "0")) | ||
cas_version := getEnv("VERSION", "4") | ||
// template := os.Args[1] | ||
var template string | ||
var cqlOutput string | ||
if template == "" { | ||
switch cas_version { | ||
case "3": | ||
template = "./v003.cql.tmpl" | ||
case "4": | ||
template = "./v004.cql.tmpl" | ||
default: | ||
template = "./v004.cql.tmpl" | ||
} | ||
mode := getEnv("MODE", "") | ||
|
||
if mode == "" { | ||
fmt.Println("missing MODE parameter") | ||
return | ||
} | ||
var datacentre, replications string | ||
// var ReplicationFactor int | ||
if mode == "prod" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need this distinction anymore. Users can provide precise values for parameters, we don't have to have Jaeger guess those parameters based on the MODE There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so should i remove the mode attribute and take prod mode as default There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes. Whichever parameters the prod attribute affects today should simply be exposed as configuration options for the user. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. btw, we do not need to enable this schema-management capability in jaeger-v1, especially if we need new configuration for it - we can make it only configurable in v2 (which is easier, just define the struct with |
||
datacentre = getEnv("DATACENTRE", "") | ||
if datacentre == "" { | ||
fmt.Println("missing DATACENTRE parameter for prod mode") | ||
return | ||
} | ||
replicationFactor, _ := strconv.Atoi(getEnv("REPLICATION_FACTOR", "2")) | ||
replications = fmt.Sprintf("{'class': 'NetworkTopologyStrategy', '%s': %d}", datacentre, replicationFactor) | ||
} else if mode == "test" { | ||
datacentre = getEnv("DATACENTRE", "") | ||
replicationFactor, _ := strconv.Atoi(getEnv("REPLICATION_FACTOR", "1")) | ||
replications = fmt.Sprintf("{'class': 'SimpleStrategy', 'replication_factor': '%d'}", replicationFactor) | ||
} else { | ||
fmt.Printf("invaild mode: %s, expecting 'prod' or 'test'", mode) | ||
return | ||
} | ||
keyspace := getEnv("KEYSPACE", fmt.Sprintf("jaeger_v1_%s", mode)) | ||
if !isValidKeyspace(keyspace) { | ||
fmt.Printf("invaild characters in KEYSPACE=%s parameter , please use letters, digits, and underscores only", keyspace) | ||
return | ||
} | ||
CompactionWindowSize, compactionWindowUnit, _ := getCompactionWindow(traceTTL, getEnv("COMPACTION_WINDOW_SIZE", "")) | ||
|
||
params := MappingBuilder{ | ||
Mode: mode, | ||
Datacentre: datacentre, | ||
Keyspace: keyspace, | ||
Replication: replications, | ||
TraceTTL: traceTTL, | ||
DependenciesTTL: dependenciesTTL, | ||
CompactionWindowSize: CompactionWindowSize, | ||
CompactionWindowUnit: compactionWindowUnit, | ||
} | ||
// Render the template | ||
cqlOutput, err = params.renderTemplate(template) | ||
if err != nil { | ||
return "", "", err | ||
} | ||
cqlOutput, _ = RenderCQLTemplate(params, cqlOutput) | ||
// fmt.Println(cqlOutput) | ||
} | ||
return cqlOutput, "", err | ||
} | ||
|
||
func SchemaInit(c *gocql.Session) { | ||
builder := &MappingBuilder{} | ||
schema, _, err := builder.GetSpanServiceMappings() | ||
Comment on lines
+199
to
+200
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where does the term "mapping" come from? If you're copying it from Elasticsearch, there "mapping" is actually a term understood by ES. No such thing in Cassandra. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should i call it template or cassandra_template ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SchemaBuilder |
||
if err != nil { | ||
fmt.Println("Error:", err) | ||
} | ||
queries := strings.Split(schema, ";") | ||
for _, query := range queries { | ||
trimmedQuery := strings.TrimSpace(query) | ||
if trimmedQuery != "" { | ||
fmt.Println(trimmedQuery) | ||
if err := c.Query(trimmedQuery + ";").Exec(); err != nil { | ||
fmt.Printf("Failed to create sampling_probabilities table: %v", err) | ||
} else { | ||
fmt.Println("Table sampling_probabilities created successfully.") | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
// Copyright (c) 2024 The Jaeger Authors. | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
package mappings | ||
|
||
import ( | ||
"os" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
// func TestRenderTemplate(t *testing.T) { | ||
// mb := &MappingBuilder{ | ||
// Mode: "test", | ||
// Datacentre: "dc1", | ||
// Keyspace: "test_keyspace", | ||
// Replication: "{'class': 'SimpleStrategy', 'replication_factor': '1'}", | ||
// TraceTTL: 172800, | ||
// DependenciesTTL: 0, | ||
// CompactionWindowSize: 30, | ||
// CompactionWindowUnit: "MINUTES", | ||
// } | ||
|
||
// // Create a temporary template file | ||
// templateContent := "CREATE KEYSPACE ${keyspace} WITH replication = ${replication};" | ||
// tmpFile, err := os.CreateTemp("", "template.cql.tmpl") | ||
// assert.NoError(t, err) | ||
// defer os.Remove(tmpFile.Name()) | ||
|
||
// _, err = tmpFile.WriteString(templateContent) | ||
// assert.NoError(t, err) | ||
// tmpFile.Close() | ||
|
||
// renderedOutput, err := mb.renderTemplate(tmpFile.Name()) | ||
// assert.NoError(t, err) | ||
// expectedOutput := "CREATE KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};" | ||
// assert.Equal(t, expectedOutput, renderedOutput) | ||
// } | ||
|
||
func TestRenderCQLTemplate(t *testing.T) { | ||
params := MappingBuilder{ | ||
Keyspace: "test_keyspace", | ||
Replication: "{'class': 'SimpleStrategy', 'replication_factor': '1'}", | ||
TraceTTL: 172800, | ||
DependenciesTTL: 0, | ||
CompactionWindowSize: 30, | ||
CompactionWindowUnit: "MINUTES", | ||
} | ||
|
||
cqlOutput := ` | ||
CREATE KEYSPACE ${keyspace} WITH replication = ${replication}; | ||
CREATE TABLE ${keyspace}.traces (trace_id UUID PRIMARY KEY, span_id UUID, trace_ttl int); | ||
` | ||
|
||
renderedOutput, err := RenderCQLTemplate(params, cqlOutput) | ||
assert.NoError(t, err) | ||
expectedOutput := ` | ||
CREATE KEYSPACE test_keyspace WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; | ||
CREATE TABLE test_keyspace.traces (trace_id UUID PRIMARY KEY, span_id UUID, trace_ttl int); | ||
` | ||
assert.Equal(t, strings.TrimSpace(expectedOutput), strings.TrimSpace(renderedOutput)) | ||
} | ||
|
||
func TestGetCompactionWindow(t *testing.T) { | ||
tests := []struct { | ||
traceTTL int | ||
compactionWindow string | ||
expectedSize int | ||
expectedUnit string | ||
expectError bool | ||
}{ | ||
{172800, "30m", 30, "MINUTES", false}, | ||
{172800, "2h", 2, "HOURS", false}, | ||
{172800, "1d", 1, "DAYS", false}, | ||
{172800, "", 96, "MINUTES", false}, | ||
{172800, "invalid", 0, "", true}, | ||
} | ||
|
||
for _, test := range tests { | ||
size, unit, err := getCompactionWindow(test.traceTTL, test.compactionWindow) | ||
if test.expectError { | ||
assert.Error(t, err) | ||
} else { | ||
assert.NoError(t, err) | ||
assert.Equal(t, test.expectedSize, size) | ||
assert.Equal(t, test.expectedUnit, unit) | ||
} | ||
} | ||
} | ||
|
||
func TestIsValidKeyspace(t *testing.T) { | ||
assert.True(t, isValidKeyspace("valid_keyspace")) | ||
assert.False(t, isValidKeyspace("invalid-keyspace")) | ||
assert.False(t, isValidKeyspace("invalid keyspace")) | ||
} | ||
|
||
func TestGetSpanServiceMappings(t *testing.T) { | ||
os.Setenv("TRACE_TTL", "172800") | ||
os.Setenv("DEPENDENCIES_TTL", "0") | ||
os.Setenv("VERSION", "4") | ||
os.Setenv("MODE", "test") | ||
os.Setenv("DATACENTRE", "dc1") | ||
os.Setenv("REPLICATION_FACTOR", "1") | ||
os.Setenv("KEYSPACE", "test_keyspace") | ||
|
||
builder := &MappingBuilder{} | ||
spanMapping, serviceMapping, err := builder.GetSpanServiceMappings() | ||
assert.NoError(t, err) | ||
assert.NotEmpty(t, spanMapping) | ||
assert.Empty(t, serviceMapping) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure how you are planning to use those files. They use variable placeholders like
${keyspace}
, which works in shell substitution, but will not work as a Go template. We should be using Go template the this functionality. Your could do a search/replace, something like${keyspace}
==>{{ .Keyspace }}
. But it would be easier just to copy and change the syntax. You might need to use conditional clauses, like{{- if .UseILM}}
from ES templates.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this cqlOutput provides the same exact output as create.sh it is done in the
RenderCQLTemplate
function