From 717b8b2061d2862b7d3ad19f252a340ffbfff42e Mon Sep 17 00:00:00 2001 From: Smuu <18609909+Smuu@users.noreply.github.com> Date: Mon, 27 Nov 2023 16:24:36 +0100 Subject: [PATCH] feat: add support for bittwister Signed-off-by: Smuu <18609909+Smuu@users.noreply.github.com> --- pkg/k8s/k8s_pod.go | 48 ++++----- pkg/knuu/instance.go | 189 +++++++++++++++++++++++++++++++----- pkg/knuu/instance_helper.go | 131 ++++++++++++++++++++----- 3 files changed, 296 insertions(+), 72 deletions(-) diff --git a/pkg/k8s/k8s_pod.go b/pkg/k8s/k8s_pod.go index 6501b90..9471f0a 100644 --- a/pkg/k8s/k8s_pod.go +++ b/pkg/k8s/k8s_pod.go @@ -90,19 +90,20 @@ func NewFile(source, dest string) *File { // ContainerConfig contains the specifications for creating a new Container object type ContainerConfig struct { - Name string // Name to assign to the Container - Image string // Name of the container image to use for the container - Command []string // Command to run in the container - Args []string // Arguments to pass to the command in the container - Env map[string]string // Environment variables to set in the container - Volumes []*Volume // Volumes to mount in the Pod - MemoryRequest string // Memory request for the container - MemoryLimit string // Memory limit for the container - CPURequest string // CPU request for the container - LivenessProbe *v1.Probe // Liveness probe for the container - ReadinessProbe *v1.Probe // Readiness probe for the container - StartupProbe *v1.Probe // Startup probe for the container - Files []*File // Files to add to the Pod + Name string // Name to assign to the Container + Image string // Name of the container image to use for the container + Command []string // Command to run in the container + Args []string // Arguments to pass to the command in the container + Env map[string]string // Environment variables to set in the container + Volumes []*Volume // Volumes to mount in the Pod + MemoryRequest string // Memory request for the container + MemoryLimit string // Memory limit for the container + CPURequest string // CPU request for the container + LivenessProbe *v1.Probe // Liveness probe for the container + ReadinessProbe *v1.Probe // Readiness probe for the container + StartupProbe *v1.Probe // Startup probe for the container + Files []*File // Files to add to the Pod + SecurityContext *v1.SecurityContext // Security context for the container } // PodConfig contains the specifications for creating a new Pod object @@ -439,16 +440,17 @@ func prepareContainer(config ContainerConfig) (v1.Container, error) { } return v1.Container{ - Name: config.Name, - Image: config.Image, - Command: config.Command, - Args: config.Args, - Env: podEnv, - VolumeMounts: containerVolumes, - Resources: resources, - LivenessProbe: config.LivenessProbe, - ReadinessProbe: config.ReadinessProbe, - StartupProbe: config.StartupProbe, + Name: config.Name, + Image: config.Image, + Command: config.Command, + Args: config.Args, + Env: podEnv, + VolumeMounts: containerVolumes, + Resources: resources, + LivenessProbe: config.LivenessProbe, + ReadinessProbe: config.ReadinessProbe, + StartupProbe: config.StartupProbe, + SecurityContext: config.SecurityContext, }, nil } diff --git a/pkg/knuu/instance.go b/pkg/knuu/instance.go index ca81c66..ca191e3 100644 --- a/pkg/knuu/instance.go +++ b/pkg/knuu/instance.go @@ -48,6 +48,30 @@ type ObsyConfig struct { otlpPassword string } +// SecurityContext represents the security settings for a container +type SecurityContext struct { + // Privileged indicates whether the container should be run in privileged mode + privileged bool + + // CapabilitiesAdd is the list of capabilities to add to the container + capabilitiesAdd []string +} + +// NetworkConfig represents the configuration for the network +type NetworkConfig struct { + // bandwidth is the bandwidth limit in bps (e.g. 1000 for 1Kbps) + bandwidth int + + // jitter is the network jitter in milliseconds (e.g. 10 for 10ms) + jitter int + + // latency is the network latency in milliseconds (e.g. 100 for 100ms) + latency int + + // packetLoss is the network packet loss rate (e.g. 10 for 10% packet loss) + packetLoss int +} + // Instance represents a instance type Instance struct { name string @@ -77,6 +101,8 @@ type Instance struct { sidecars []*Instance fsGroup int64 obsyConfig *ObsyConfig + securityContext *SecurityContext + networkConfig *NetworkConfig } // NewInstance creates a new instance of the Instance struct @@ -101,31 +127,43 @@ func NewInstance(name string) (*Instance, error) { otlpPassword: "", jaegerEndpoint: "", } + securityContext := &SecurityContext{ + privileged: false, + capabilitiesAdd: make([]string, 0), + } + networkConfig := &NetworkConfig{ + bandwidth: 0, + jitter: 0, + latency: 0, + packetLoss: 0, + } // Create the instance return &Instance{ - name: name, - k8sName: k8sName, - imageName: "", - state: None, - instanceType: BasicInstance, - portsTCP: make([]int, 0), - portsUDP: make([]int, 0), - command: make([]string, 0), - args: make([]string, 0), - env: make(map[string]string), - volumes: make([]*k8s.Volume, 0), - memoryRequest: "", - memoryLimit: "", - cpuRequest: "", - policyRules: make([]rbacv1.PolicyRule, 0), - livenessProbe: nil, - readinessProbe: nil, - startupProbe: nil, - files: make([]*k8s.File, 0), - isSidecar: false, - parentInstance: nil, - sidecars: make([]*Instance, 0), - obsyConfig: obsyConfig, + name: name, + k8sName: k8sName, + imageName: "", + state: None, + instanceType: BasicInstance, + portsTCP: make([]int, 0), + portsUDP: make([]int, 0), + command: make([]string, 0), + args: make([]string, 0), + env: make(map[string]string), + volumes: make([]*k8s.Volume, 0), + memoryRequest: "", + memoryLimit: "", + cpuRequest: "", + policyRules: make([]rbacv1.PolicyRule, 0), + livenessProbe: nil, + readinessProbe: nil, + startupProbe: nil, + files: make([]*k8s.File, 0), + isSidecar: false, + parentInstance: nil, + sidecars: make([]*Instance, 0), + obsyConfig: obsyConfig, + securityContext: securityContext, + networkConfig: networkConfig, }, nil } @@ -784,6 +822,41 @@ func (i *Instance) SetJaegerExporter(endpoint string) error { return nil } +// SetPrivileged sets the privileged status for the instance +// This function can only be called in the state 'Preparing' or 'Committed' +func (i *Instance) SetPrivileged(privileged bool) error { + if !i.IsInState(Preparing, Committed) { + return fmt.Errorf("setting privileged is only allowed in state 'Preparing' or 'Committed'. Current state is '%s'", i.state.String()) + } + i.securityContext.privileged = privileged + logrus.Debugf("Set privileged to '%t' for instance '%s'", privileged, i.name) + return nil +} + +// AddCapability adds a capability to the instance +// This function can only be called in the state 'Preparing' or 'Committed' +func (i *Instance) AddCapability(capability string) error { + if !i.IsInState(Preparing, Committed) { + return fmt.Errorf("adding capability is only allowed in state 'Preparing' or 'Committed'. Current state is '%s'", i.state.String()) + } + i.securityContext.capabilitiesAdd = append(i.securityContext.capabilitiesAdd, capability) + logrus.Debugf("Added capability '%s' to instance '%s'", capability, i.name) + return nil +} + +// AddCapabilities adds multiple capabilities to the instance +// This function can only be called in the state 'Preparing' or 'Committed' +func (i *Instance) AddCapabilities(capabilities []string) error { + if !i.IsInState(Preparing, Committed) { + return fmt.Errorf("adding capabilities is only allowed in state 'Preparing' or 'Committed'. Current state is '%s'", i.state.String()) + } + for _, capability := range capabilities { + i.securityContext.capabilitiesAdd = append(i.securityContext.capabilitiesAdd, capability) + logrus.Debugf("Added capability '%s' to instance '%s'", capability, i.name) + } + return nil +} + // StartWithoutWait starts the instance without waiting for it to be ready // This function can only be called in the state 'Committed' or 'Stopped' func (i *Instance) StartWithoutWait() error { @@ -808,6 +881,12 @@ func (i *Instance) StartWithoutWait() error { return fmt.Errorf("error adding OpenTelemetry collector sidecar for instance '%s': %w", i.k8sName, err) } } + // check if networking is configured and if so, add the corresponding sidecar + if i.isNetworkConfigSet() { + if err := i.addNetworkConfigSidecar(); err != nil { + return fmt.Errorf("error adding network sidecar for instance '%s': %w", i.k8sName, err) + } + } if err := i.deployResources(); err != nil { return fmt.Errorf("error deploying resources for instance '%s': %w", i.k8sName, err) @@ -896,6 +975,70 @@ func (i *Instance) DisableNetwork() error { return nil } +// SetBandwidthLimit sets the bandwidth limit of the instance +// bandwidth limit in bps (e.g. 1000 for 1Kbps) +// Currently, only one of bandwidth, jitter, latency or packet loss can be set +// This function can only be called in the state 'Commited' +func (i *Instance) SetBandwidthLimit(limit int) error { + if !i.IsInState(Committed) { + return fmt.Errorf("setting bandwidth limit is only allowed in state 'Committed'. Current state is '%s'", i.state.String()) + } + if i.isNetworkConfigSet() { + return fmt.Errorf("setting bandwidth limit is not allowed if network config is already set") + } + i.networkConfig.bandwidth = limit + logrus.Debugf("Set bandwidth limit to '%d' in instance '%s'", limit, i.name) + return nil +} + +// SetJitter sets the jitter of the instance +// jitter in ms (e.g. 1000 for 1s) +// Currently, only one of bandwidth, jitter, latency or packet loss can be set +// This function can only be called in the state 'Commited' +func (i *Instance) SetJitter(jitter int) error { + if !i.IsInState(Committed) { + return fmt.Errorf("setting jitter is only allowed in state 'Committed'. Current state is '%s'", i.state.String()) + } + if i.isNetworkConfigSet() { + return fmt.Errorf("setting jitter is not allowed if network config is already set") + } + i.networkConfig.jitter = jitter + logrus.Debugf("Set jitter to '%d' in instance '%s'", jitter, i.name) + return nil +} + +// SetLatency sets the latency of the instance +// latency in ms (e.g. 1000 for 1s) +// Currently, only one of bandwidth, jitter, latency or packet loss can be set +// This function can only be called in the state 'Commited' +func (i *Instance) SetLatency(latency int) error { + if !i.IsInState(Committed) { + return fmt.Errorf("setting latency is only allowed in state 'Committed'. Current state is '%s'", i.state.String()) + } + if i.isNetworkConfigSet() { + return fmt.Errorf("setting latency is not allowed if network config is already set") + } + i.networkConfig.latency = latency + logrus.Debugf("Set latency to '%d' in instance '%s'", latency, i.name) + return nil +} + +// SetPacketLoss sets the packet loss of the instance +// packet loss in percent (e.g. 10 for 10%) +// Currently, only one of bandwidth, jitter, latency or packet loss can be set +// This function can only be called in the state 'Commited' +func (i *Instance) SetPacketLoss(packetLoss int) error { + if !i.IsInState(Committed) { + return fmt.Errorf("setting packet loss is only allowed in state 'Committed'. Current state is '%s'", i.state.String()) + } + if i.isNetworkConfigSet() { + return fmt.Errorf("setting packet loss is not allowed if network config is already set") + } + i.networkConfig.packetLoss = packetLoss + logrus.Debugf("Set packet loss to '%d' in instance '%s'", packetLoss, i.name) + return nil +} + // EnableNetwork enables the network of the instance // This function can only be called in the state 'Started' func (i *Instance) EnableNetwork() error { diff --git a/pkg/knuu/instance_helper.go b/pkg/knuu/instance_helper.go index b339e8f..6886277 100644 --- a/pkg/knuu/instance_helper.go +++ b/pkg/knuu/instance_helper.go @@ -11,6 +11,7 @@ import ( "github.com/celestiaorg/knuu/pkg/k8s" "github.com/google/uuid" "github.com/sirupsen/logrus" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" ) @@ -322,6 +323,12 @@ func (i *Instance) cloneWithSuffix(suffix string) *Instance { clonedSidecars[i] = sidecar.cloneWithSuffix(suffix) } + // Deep copy of networkConfig to ensure cloned instance has its own copy + clonedNetworkConfig := *i.networkConfig + + // Deep copy of securityContext to ensure cloned instance has its own copy + clonedSecurityContext := *i.securityContext + return &Instance{ name: i.name + suffix, k8sName: i.k8sName + suffix, @@ -348,6 +355,8 @@ func (i *Instance) cloneWithSuffix(suffix string) *Instance { parentInstance: nil, sidecars: clonedSidecars, obsyConfig: i.obsyConfig, + securityContext: &clonedSecurityContext, + networkConfig: &clonedNetworkConfig, } } @@ -411,41 +420,66 @@ func (i *Instance) addFileToBuilder(src, dest, chown string) error { return nil } +// prepareSecurityContext creates a v1.SecurityContext from a given SecurityContext. +func prepareSecurityContext(config *SecurityContext) *v1.SecurityContext { + securityContext := &v1.SecurityContext{} + + if config != nil { + if config.privileged { + securityContext.Privileged = &config.privileged + } + if len(config.capabilitiesAdd) > 0 { + capabilities := make([]v1.Capability, len(config.capabilitiesAdd)) + for i, cap := range config.capabilitiesAdd { + capabilities[i] = v1.Capability(cap) + } + securityContext.Capabilities = &v1.Capabilities{ + Add: capabilities, + } + } + } + + return securityContext +} + // prepareConfig prepares the config for the instance func (i *Instance) prepareStatefulSetConfig() k8s.StatefulSetConfig { + // Generate the container configuration containerConfig := k8s.ContainerConfig{ - Name: i.k8sName, - Image: i.imageName, - Command: i.command, - Args: i.args, - Env: i.env, - Volumes: i.volumes, - MemoryRequest: i.memoryRequest, - MemoryLimit: i.memoryLimit, - CPURequest: i.cpuRequest, - LivenessProbe: i.livenessProbe, - ReadinessProbe: i.readinessProbe, - StartupProbe: i.startupProbe, - Files: i.files, + Name: i.k8sName, + Image: i.imageName, + Command: i.command, + Args: i.args, + Env: i.env, + Volumes: i.volumes, + MemoryRequest: i.memoryRequest, + MemoryLimit: i.memoryLimit, + CPURequest: i.cpuRequest, + LivenessProbe: i.livenessProbe, + ReadinessProbe: i.readinessProbe, + StartupProbe: i.startupProbe, + Files: i.files, + SecurityContext: prepareSecurityContext(i.securityContext), } // Generate the sidecar configurations sidecarConfigs := make([]k8s.ContainerConfig, 0) for _, sidecar := range i.sidecars { sidecarConfigs = append(sidecarConfigs, k8s.ContainerConfig{ - Name: sidecar.k8sName, - Image: sidecar.imageName, - Command: sidecar.command, - Args: sidecar.args, - Env: sidecar.env, - Volumes: sidecar.volumes, - MemoryRequest: sidecar.memoryRequest, - MemoryLimit: sidecar.memoryLimit, - CPURequest: sidecar.cpuRequest, - LivenessProbe: sidecar.livenessProbe, - ReadinessProbe: sidecar.readinessProbe, - StartupProbe: sidecar.startupProbe, - Files: sidecar.files, + Name: sidecar.k8sName, + Image: sidecar.imageName, + Command: sidecar.command, + Args: sidecar.args, + Env: sidecar.env, + Volumes: sidecar.volumes, + MemoryRequest: sidecar.memoryRequest, + MemoryLimit: sidecar.memoryLimit, + CPURequest: sidecar.cpuRequest, + LivenessProbe: sidecar.livenessProbe, + ReadinessProbe: sidecar.readinessProbe, + StartupProbe: sidecar.startupProbe, + Files: sidecar.files, + SecurityContext: prepareSecurityContext(sidecar.securityContext), }) } // Generate the pod configuration @@ -514,6 +548,11 @@ func (i *Instance) isObservabilityEnabled() bool { return i.obsyConfig.otlpPort != 0 || i.obsyConfig.prometheusPort != 0 || i.obsyConfig.jaegerGrpcPort != 0 || i.obsyConfig.jaegerThriftCompactPort != 0 || i.obsyConfig.jaegerThriftHttpPort != 0 } +// isNetworkConfigSet returns true if any network config value is set +func (i *Instance) isNetworkConfigSet() bool { + return i.networkConfig.bandwidth != 0 || i.networkConfig.jitter != 0 || i.networkConfig.latency != 0 || i.networkConfig.packetLoss != 0 +} + func (i *Instance) validateStateForObsy(endpoint string) error { if !i.IsInState(Preparing, Committed) { return fmt.Errorf("setting %s is only allowed in state 'Preparing' or 'Committed'. Current state is '%s'", endpoint, i.state.String()) @@ -531,3 +570,43 @@ func (i *Instance) addOtelCollectorSidecar() error { } return nil } + +func (i *Instance) createNetworkConfigInstance() (*Instance, error) { + networkConfigInstance, err := NewInstance("network-config") + if err != nil { + return nil, fmt.Errorf("error creating network-config instance: %w", err) + } + if err := networkConfigInstance.SetImage("ttl.sh/41838c59-c02e-4cac-9284-e3f47d32391d:latest"); err != nil { + return nil, fmt.Errorf("error setting image for network-config instance: %w", err) + } + if err := networkConfigInstance.Commit(); err != nil { + return nil, fmt.Errorf("error committing network-config instance: %w", err) + } + return networkConfigInstance, nil +} + +func (i *Instance) addNetworkConfigSidecar() error { + networkConfigSidecar, err := i.createNetworkConfigInstance() + if err != nil { + return fmt.Errorf("error creating network config instance '%s': %w", i.k8sName, err) + } + bittwisterCmd := []string{"./bittwister", "start", "-d", "eth0"} + switch { + case i.networkConfig.bandwidth != 0: + networkConfigSidecar.SetCommand(append(bittwisterCmd, "-b", fmt.Sprintf("%d", i.networkConfig.bandwidth))...) + networkConfigSidecar.SetPrivileged(true) + case i.networkConfig.jitter != 0: + networkConfigSidecar.SetCommand(append(bittwisterCmd, "-j", fmt.Sprintf("%d", i.networkConfig.jitter))...) + networkConfigSidecar.AddCapability("NET_ADMIN") + case i.networkConfig.latency != 0: + networkConfigSidecar.SetCommand(append(bittwisterCmd, "-l", fmt.Sprintf("%d", i.networkConfig.latency))...) + networkConfigSidecar.AddCapability("NET_ADMIN") + case i.networkConfig.packetLoss != 0: + networkConfigSidecar.SetCommand(append(bittwisterCmd, "-p", fmt.Sprintf("%d", i.networkConfig.packetLoss))...) + networkConfigSidecar.SetPrivileged(true) + } + if err := i.AddSidecar(networkConfigSidecar); err != nil { + return fmt.Errorf("error adding network config sidecar to instance '%s': %w", i.k8sName, err) + } + return nil +}