Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add otel to mainnn #203

Merged
merged 4 commits into from
Nov 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/docker/docker v24.0.7+incompatible
github.com/google/uuid v1.3.1
github.com/sirupsen/logrus v1.9.3
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.28.3
k8s.io/apimachinery v0.28.3
k8s.io/client-go v0.28.3
Expand Down Expand Up @@ -53,7 +54,6 @@ require (
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.4.0 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
Expand Down
128 changes: 127 additions & 1 deletion pkg/knuu/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,37 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
)

// ObsyConfig represents the configuration for the obsy sidecar
type ObsyConfig struct {
// otelCollectorVersion is the version of the otel collector to use
otelCollectorVersion string

// prometheusPort is the port on which the prometheus server will be exposed
prometheusPort int
// prometheusJobName is the name of the prometheus job
prometheusJobName string
// prometheusScrapeInterval is the scrape interval for the prometheus job
prometheusScrapeInterval string

// jaegerGrpcPort is the port on which the jaeger grpc server is exposed
jaegerGrpcPort int
// jaegerThriftCompactPort is the port on which the jaeger thrift compact server is exposed
jaegerThriftCompactPort int
// jaegerThriftHttpPort is the port on which the jaeger thrift http server is exposed
jaegerThriftHttpPort int
// jaegerEndpoint is the endpoint of the jaeger collector where spans will be sent to
jaegerEndpoint string

// otlpPort is the port on which the otlp server is exposed
otlpPort int
// otlpEndpoint is the endpoint of the otlp collector where spans will be sent to
otlpEndpoint string
// otlpUsername is the username to use for the otlp collector
otlpUsername string
// otlpPassword is the password to use for the otlp collector
otlpPassword string
}

// Instance represents a instance
type Instance struct {
name string
Expand Down Expand Up @@ -45,6 +76,7 @@ type Instance struct {
parentInstance *Instance
sidecars []*Instance
fsGroup int64
obsyConfig *ObsyConfig
}

// NewInstance creates a new instance of the Instance struct
Expand All @@ -55,6 +87,20 @@ func NewInstance(name string) (*Instance, error) {
if err != nil {
return nil, fmt.Errorf("error generating k8s name for instance '%s': %w", name, err)
}
obsyConfig := &ObsyConfig{
otelCollectorVersion: "0.83.0",
otlpPort: 0,
prometheusPort: 0,
prometheusJobName: "",
prometheusScrapeInterval: "",
jaegerGrpcPort: 0,
jaegerThriftCompactPort: 0,
jaegerThriftHttpPort: 0,
otlpEndpoint: "",
otlpUsername: "",
otlpPassword: "",
jaegerEndpoint: "",
}
// Create the instance
return &Instance{
name: name,
Expand All @@ -79,6 +125,7 @@ func NewInstance(name string) (*Instance, error) {
isSidecar: false,
parentInstance: nil,
sidecars: make([]*Instance, 0),
obsyConfig: obsyConfig,
}, nil
}

Expand Down Expand Up @@ -665,6 +712,78 @@ func (i *Instance) AddSidecar(sidecar *Instance) error {
return nil
}

// SetOtelCollectorVersion sets the OpenTelemetry collector version for the instance
// This function can only be called in the state 'Preparing' or 'Committed'
func (i *Instance) SetOtelCollectorVersion(version string) error {
if err := i.validateStateForObsy("OpenTelemetry collector version"); err != nil {
return err
}
i.obsyConfig.otelCollectorVersion = version
logrus.Debugf("Set OpenTelemetry collector version '%s' for instance '%s'", version, i.name)
return nil
}

// SetOtelEndpoint sets the OpenTelemetry endpoint for the instance
// This function can only be called in the state 'Preparing' or 'Committed'
func (i *Instance) SetOtelEndpoint(port int) error {
if err := i.validateStateForObsy("OpenTelemetry endpoint"); err != nil {
return err
}
i.obsyConfig.otlpPort = port
logrus.Debugf("Set OpenTelemetry endpoint '%d' for instance '%s'", port, i.name)
return nil
}

// SetPrometheusEndpoint sets the Prometheus endpoint for the instance
// This function can only be called in the state 'Preparing' or 'Committed'
func (i *Instance) SetPrometheusEndpoint(port int, jobName, scapeInterval string) error {
if err := i.validateStateForObsy("Prometheus endpoint"); err != nil {
return err
}
i.obsyConfig.prometheusPort = port
i.obsyConfig.prometheusJobName = jobName
i.obsyConfig.prometheusScrapeInterval = scapeInterval
logrus.Debugf("Set Prometheus endpoint '%d' for instance '%s'", port, i.name)
return nil
}

// SetJaegerEndpoint sets the Jaeger endpoint for the instance
// This function can only be called in the state 'Preparing' or 'Committed'
func (i *Instance) SetJaegerEndpoint(grpcPort, thriftCompactPort, thriftHttpPort int) error {
if err := i.validateStateForObsy("Jaeger endpoint"); err != nil {
return err
}
i.obsyConfig.jaegerGrpcPort = grpcPort
i.obsyConfig.jaegerThriftCompactPort = thriftCompactPort
i.obsyConfig.jaegerThriftHttpPort = thriftHttpPort
logrus.Debugf("Set Jaeger endpoints '%d', '%d' and '%d' for instance '%s'", grpcPort, thriftCompactPort, thriftHttpPort, i.name)
return nil
}

// SetOtlpExporter sets the OTLP exporter for the instance
// This function can only be called in the state 'Preparing' or 'Committed'
func (i *Instance) SetOtlpExporter(endpoint, username, password string) error {
if err := i.validateStateForObsy("OTLP exporter"); err != nil {
return err
}
i.obsyConfig.otlpEndpoint = endpoint
i.obsyConfig.otlpUsername = username
i.obsyConfig.otlpPassword = password
logrus.Debugf("Set OTLP exporter '%s' for instance '%s'", endpoint, i.name)
return nil
}

// SetJaegerExporter sets the Jaeger exporter for the instance
// This function can only be called in the state 'Preparing' or 'Committed'
func (i *Instance) SetJaegerExporter(endpoint string) error {
if err := i.validateStateForObsy("Jaeger exporter"); err != nil {
return err
}
i.obsyConfig.jaegerEndpoint = endpoint
logrus.Debugf("Set Jaeger exporter '%s' for instance '%s'", endpoint, i.name)
return nil
}

// StartWithoutWait starts the instance without waiting for it to be ready
// This function can only be called in the state 'Committed' or 'Stopped'
func (i *Instance) StartWithoutWait() error {
Expand All @@ -683,6 +802,12 @@ func (i *Instance) StartWithoutWait() error {
return fmt.Errorf("starting a sidecar is not allowed")
}
if i.state == Committed {
// deploy otel collector if observability is enabled
if i.isObservabilityEnabled() {
if err := i.addOtelCollectorSidecar(); err != nil {
return fmt.Errorf("error adding OpenTelemetry collector sidecar for instance '%s': %w", i.k8sName, err)
}
}

if err := i.deployResources(); err != nil {
return fmt.Errorf("error deploying resources for instance '%s': %w", i.k8sName, err)
Expand Down Expand Up @@ -848,9 +973,10 @@ func (i *Instance) Destroy() error {
}

if err := applyFunctionToInstances(i.sidecars, func(sidecar Instance) error {
logrus.Debugf("Destroying sidecar resources from '%s'", sidecar.k8sName)
return sidecar.destroyResources()
}); err != nil {
return err
return fmt.Errorf("error destroying resources for sidecars of instance '%s': %w", i.k8sName, err)
}

i.state = Destroyed
Expand Down
52 changes: 42 additions & 10 deletions pkg/knuu/instance_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,15 +294,20 @@ func (i *Instance) destroyResources() error {
return fmt.Errorf("error destroying service for instance '%s': %w", i.k8sName, err)
}

// enable network when network is disabled
disableNetwork, err := i.NetworkIsDisabled()
if err != nil {
return fmt.Errorf("error checking network status for instance '%s': %w", i.k8sName, err)
}
if disableNetwork {
err := i.EnableNetwork()
// disable network only for non-sidecar instances
if !i.isSidecar {
// enable network when network is disabled
disableNetwork, err := i.NetworkIsDisabled()
if err != nil {
return fmt.Errorf("error enabling network for instance '%s': %w", i.k8sName, err)
logrus.Debugf("error checking network status for instance")
return fmt.Errorf("error checking network status for instance '%s': %w", i.k8sName, err)
}
if disableNetwork {
err := i.EnableNetwork()
if err != nil {
logrus.Debugf("error enabling network for instance")
return fmt.Errorf("error enabling network for instance '%s': %w", i.k8sName, err)
}
}
}

Expand Down Expand Up @@ -342,6 +347,7 @@ func (i *Instance) cloneWithSuffix(suffix string) *Instance {
isSidecar: false,
parentInstance: nil,
sidecars: clonedSidecars,
obsyConfig: i.obsyConfig,
}
}

Expand Down Expand Up @@ -486,16 +492,42 @@ func (i *Instance) setImageWithGracePeriod(imageName string, gracePeriod *int64)
func applyFunctionToInstances(instances []*Instance, function func(sidecar Instance) error) error {
for _, i := range instances {
if err := function(*i); err != nil {
return fmt.Errorf("error")
return fmt.Errorf("error applying function to instance '%s': %w", i.k8sName, err)
}
}
return nil
}

func setStateForSidecars(sidecars []*Instance, state InstanceState) {
// We don't handle errors here, as the function can't return an error
applyFunctionToInstances(sidecars, func(sidecar Instance) error {
err := applyFunctionToInstances(sidecars, func(sidecar Instance) error {
sidecar.state = state
return nil
})
if err != nil {
return
}
}

// isObservabilityEnabled returns true if observability is enabled
func (i *Instance) isObservabilityEnabled() bool {
return i.obsyConfig.otlpPort != 0 || i.obsyConfig.prometheusPort != 0 || i.obsyConfig.jaegerGrpcPort != 0 || i.obsyConfig.jaegerThriftCompactPort != 0 || i.obsyConfig.jaegerThriftHttpPort != 0
}

func (i *Instance) validateStateForObsy(endpoint string) error {
if !i.IsInState(Preparing, Committed) {
return fmt.Errorf("setting %s is only allowed in state 'Preparing' or 'Committed'. Current state is '%s'", endpoint, i.state.String())
}
return nil
}

func (i *Instance) addOtelCollectorSidecar() error {
otelSidecar, err := i.createOtelCollectorInstance()
if err != nil {
return fmt.Errorf("error creating otel collector instance '%s': %w", i.k8sName, err)
}
if err := i.AddSidecar(otelSidecar); err != nil {
return fmt.Errorf("error adding otel collector sidecar to instance '%s': %w", i.k8sName, err)
}
return nil
}
Loading